Support for subscription callback 06/3106/4 v0.4.6
authorMohamed Abukar <abukar.mohamed@nokia.com>
Thu, 2 Apr 2020 07:08:14 +0000 (10:08 +0300)
committerMohamed Abukar <abukar.mohamed@nokia.com>
Mon, 6 Apr 2020 04:38:24 +0000 (07:38 +0300)
Change-Id: If35eb7b2ee3e4841dfdcb9345172aa337afaf140
Signed-off-by: Mohamed Abukar <abukar.mohamed@nokia.com>
ci/Dockerfile
config/config-file.yaml
pkg/xapp/subscription.go
pkg/xapp/subscription_test.go
pkg/xapp/xapp_test.go

index 7b95438..8303114 100755 (executable)
@@ -35,7 +35,7 @@ RUN apt-get update -y \
 RUN curl -s https://packagecloud.io/install/repositories/o-ran-sc/master/script.deb.sh | bash
 
 # RMR
-ARG RMRVERSION=3.6.0
+ARG RMRVERSION=3.6.5
 #RUN apt-get install -y rmr=${RMRVERSION} rmr-dev=${RMRVERSION}
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb
index 42236ec..6f163c3 100755 (executable)
@@ -26,6 +26,8 @@
 "subscription":
     "host": "localhost:8088"
     "timeout": 2
+    "retryCount": 10
+    "retryDelay": 5
 "db":
   "namespace": "sdl"
   "waitForSdl": false
index 6ceabf9..cf8dc04 100755 (executable)
 package xapp
 
 import (
+       "bytes"
        "encoding/json"
        "fmt"
        "github.com/go-openapi/loads"
        httptransport "github.com/go-openapi/runtime/client"
        "github.com/go-openapi/runtime/middleware"
        "github.com/go-openapi/strfmt"
+       "github.com/spf13/viper"
        "io/ioutil"
        "net/http"
+       "strings"
        "time"
 
        apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
@@ -48,6 +51,7 @@ import (
 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
 type SubscriptionDeleteHandler func(string) error
+type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
 
 type Subscriber struct {
        localAddr  string
@@ -56,6 +60,8 @@ type Subscriber struct {
        remoteUrl  string
        remoteProt []string
        timeout    time.Duration
+       clientUrl  string
+       clientCB   SubscriptionResponseCallback
 }
 
 func NewSubscriber(host string, timo int) *Subscriber {
@@ -67,18 +73,34 @@ func NewSubscriber(host string, timo int) *Subscriber {
                timo = 20
        }
 
-       return &Subscriber{
+       r := &Subscriber{
                remoteHost: host,
                remoteUrl:  "/ric/v1",
                remoteProt: []string{"http"},
                timeout:    time.Duration(timo) * time.Second,
                localAddr:  "0.0.0.0",
                localPort:  8088,
+               clientUrl:  "/ric/v1/subscriptions/response",
+       }
+       Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
+
+       return r
+}
+
+func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
+       if req.Body != nil {
+               var resp apimodel.SubscriptionResponse
+               if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
+                       if r.clientCB != nil {
+                               r.clientCB(&resp)
+                       }
+               }
+               req.Body.Close()
        }
 }
 
 // Server interface: listen and receive subscription requests
-func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error {
+func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
        swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
        if err != nil {
                return err
@@ -86,10 +108,10 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle
 
        api := operations.NewXappFrameworkAPI(swaggerSpec)
 
-       // Subscription: query
+       // Subscription: Query
        api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
                func(p query.GetAllSubscriptionsParams) middleware.Responder {
-                       if resp, err := get(); err == nil {
+                       if resp, err := getSubscription(); err == nil {
                                return query.NewGetAllSubscriptionsOK().WithPayload(resp)
                        }
                        return query.NewGetAllSubscriptionsInternalServerError()
@@ -98,25 +120,25 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle
        // SubscriptionType: Report
        api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
                func(p report.SubscribeReportParams) middleware.Responder {
-                       if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil {
+                       if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
                                return report.NewSubscribeReportCreated().WithPayload(resp)
                        }
                        return report.NewSubscribeReportInternalServerError()
                })
 
-       // SubscriptionType: policy
+       // SubscriptionType: Policy
        api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
                func(p policy.SubscribePolicyParams) middleware.Responder {
-                       if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
+                       if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
                                return policy.NewSubscribePolicyCreated().WithPayload(resp)
                        }
                        return policy.NewSubscribePolicyInternalServerError()
                })
 
-       // SubscriptionType: delete
+       // SubscriptionType: Delete
        api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
                func(p common.UnsubscribeParams) middleware.Responder {
-                       if err := del(p.SubscriptionID); err == nil {
+                       if err := delSubscription(p.SubscriptionID); err == nil {
                                return common.NewUnsubscribeNoContent()
                        }
                        return common.NewUnsubscribeInternalServerError()
@@ -134,6 +156,50 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle
        return nil
 }
 
+// Server interface: send notification to client
+func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
+       respData, err := json.Marshal(resp)
+       if err != nil {
+               Logger.Error("json.Marshal failed: %v", err)
+               return err
+       }
+
+       port := strings.Split(viper.GetString("local.host"), ":")[1]
+       clientUrl := fmt.Sprintf("http://%s:%s%s", clientEndpoint, port, r.clientUrl)
+
+       retries := viper.GetInt("subscription.retryCount")
+       if retries == 0 {
+               retries = 10
+       }
+
+       delay := viper.GetInt("subscription.retryDelay")
+       if delay == 0 {
+               delay = 5
+       }
+
+       for i := 0; i < retries; i++ {
+               r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
+               if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
+                       break
+               }
+
+               if err != nil {
+                       Logger.Error("%v", err)
+               }
+               if r != nil && r.StatusCode != http.StatusOK {
+                       Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
+               }
+               time.Sleep(time.Duration(delay) * time.Second)
+       }
+
+       return err
+}
+
+// Subscription interface for xApp: Response callback
+func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
+       r.clientCB = c
+}
+
 // Subscription interface for xApp: REPORT
 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
        params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
index b9713af..a5601cc 100755 (executable)
@@ -7,28 +7,29 @@
 package xapp
 
 import (
-       apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
        "github.com/stretchr/testify/assert"
        "testing"
        "time"
+       "fmt"
 )
 
 var suite *testing.T
 
 var meid = "gnb123456"
 var funId = int64(1)
-var clientEndpoint = "localhost:4561"
+var clientEndpoint = "localhost"
 var direction = int64(0)
 var procedureCode = int64(27)
 var typeOfMessage = int64(1)
 
-var reportParams = apimodel.ReportParams{
-       Meid: meid,
-       RANFunctionID: &funId,
+var reportParams = clientmodel.ReportParams{
+       Meid:           meid,
+       RANFunctionID:  &funId,
        ClientEndpoint: &clientEndpoint,
-       EventTriggers: apimodel.EventTriggerList{
-               &apimodel.EventTrigger{
+       EventTriggers: clientmodel.EventTriggerList{
+               &clientmodel.EventTrigger{
                        InterfaceDirection: direction,
                        ProcedureCode:      procedureCode,
                        TypeOfMessage:      typeOfMessage,
@@ -36,18 +37,34 @@ var reportParams = apimodel.ReportParams{
        },
 }
 
-var policyParams = apimodel.PolicyParams{
-       Meid: &meid,
-       RANFunctionID: &funId,
+var policyParams = clientmodel.PolicyParams{
+       Meid:           &meid,
+       RANFunctionID:  &funId,
        ClientEndpoint: &clientEndpoint,
-       EventTriggers: apimodel.EventTriggerList{
-               &apimodel.EventTrigger{
+       EventTriggers: clientmodel.EventTriggerList{
+               &clientmodel.EventTrigger{
                        InterfaceDirection: direction,
                        ProcedureCode:      procedureCode,
                        TypeOfMessage:      typeOfMessage,
                },
        },
-       PolicyActionDefinitions: &apimodel.PolicyActionDefinition{},
+       PolicyActionDefinitions: &clientmodel.PolicyActionDefinition{},
+}
+
+func processSubscriptions(subscriptionId string) {
+       // Generate requestorId, instanceId
+       reqId := int64(11)
+       instanceId := int64(22)
+
+       resp := &models.SubscriptionResponse{
+               SubscriptionID: &subscriptionId,
+               SubscriptionInstances: []*models.SubscriptionInstance{
+                       &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId},
+               },
+       }
+
+       // Notify the client: don't worry about errors ... Notify() will handle retries, etc.
+       Subscription.Notify(resp, clientEndpoint)
 }
 
 func subscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
@@ -65,15 +82,15 @@ func subscriptionHandler(stype models.SubscriptionType, params interface{}) (*mo
                assert.Equal(suite, clientEndpoint, *p.ClientEndpoint)
        }
 
-       subId := "xapp-11"
-       reqId := int64(11)
-       instanceId := int64(22)
+       // Generate a unique subscriptionId
+       subscriptionId := fmt.Sprintf("%s-%s", meid, clientEndpoint)
+
+       // Process subscriptions on the background
+       go processSubscriptions(subscriptionId)
+
+       // and send response immediately
        return &models.SubscriptionResponse{
-               SubscriptionID: &subId,
-               SubscriptionInstances: []*models.SubscriptionInstance{
-                       &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId}, 
-                       &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId},
-               },
+               SubscriptionID: &subscriptionId,
        }, nil
 }
 
@@ -113,27 +130,29 @@ func TestSubscriptionQueryHandling(t *testing.T) {
 }
 
 func TestSubscriptionReportHandling(t *testing.T) {
-       resp, err := Subscription.SubscribeReport(&reportParams)
+       Subscription.SetResponseCB(func(resp *clientmodel.SubscriptionResponse) {
+               assert.Equal(t, len(resp.SubscriptionInstances), 1)
+               assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
+               assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
+       })
 
+       _, err := Subscription.SubscribeReport(&reportParams)
        assert.Equal(t, err, nil)
-       assert.Equal(t, len(resp.SubscriptionInstances), 2)
-       assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
-       assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
-       assert.Equal(t, *resp.SubscriptionInstances[1].RequestorID, int64(11))
-       assert.Equal(t, *resp.SubscriptionInstances[1].InstanceID, int64(22))
 }
 
 func TestSubscriptionPolicytHandling(t *testing.T) {
-       resp, err := Subscription.SubscribePolicy(&policyParams)
+       Subscription.SetResponseCB(func(resp *clientmodel.SubscriptionResponse) {
+               assert.Equal(t, len(resp.SubscriptionInstances), 1)
+               assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
+               assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
+       })
 
+       _, err := Subscription.SubscribePolicy(&policyParams)
        assert.Equal(t, err, nil)
-       assert.Equal(t, len(resp.SubscriptionInstances), 2)
-       assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
-       assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
 }
 
 func TestSubscriptionDeleteHandling(t *testing.T) {
        err := Subscription.UnSubscribe(clientEndpoint)
 
        assert.Equal(t, err, nil)
-}
\ No newline at end of file
+}
index e52c3ee..9696e6e 100755 (executable)
@@ -190,44 +190,44 @@ func TestMessagesReceivedSuccessfullyUsingWh(t *testing.T) {
 }
 
 func TestMessagesReceivedSuccessfullyUsingWhCall(t *testing.T) {
-        time.Sleep(time.Duration(5) * time.Second)
-        whid := Rmr.Openwh("localhost:4560")
-                params := &RMRParams{}
-                params.Payload = []byte("newrt|start\nnewrt|end\n")
-                params.Whid = int(whid)
-               params.Callid = 4
-               params.Timeout = 1000
-               Rmr.SendCallMsg(params)
+       time.Sleep(time.Duration(5) * time.Second)
+       whid := Rmr.Openwh("localhost:4560")
+       params := &RMRParams{}
+       params.Payload = []byte("newrt|start\nnewrt|end\n")
+       params.Whid = int(whid)
+       params.Callid = 4
+       params.Timeout = 1000
+       Rmr.SendCallMsg(params)
 
        // Allow time to process the messages
        time.Sleep(time.Duration(2) * time.Second)
 
-        waitForSdl := viper.GetBool("db.waitForSdl")
-        stats := getMetrics(t)
-        if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") {
-                t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats)
-        }
+       waitForSdl := viper.GetBool("db.waitForSdl")
+       stats := getMetrics(t)
+       if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") {
+               t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats)
+       }
 
-        if !strings.Contains(stats, "ricxapp_RMR_Received 201") {
-                t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats)
-        }
+       if !strings.Contains(stats, "ricxapp_RMR_Received 201") {
+               t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats)
+       }
 
-        if !strings.Contains(stats, "ricxapp_RMR_TransmitError 1") {
-                t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect")
-        }
+       if !strings.Contains(stats, "ricxapp_RMR_TransmitError 1") {
+               t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect")
+       }
 
-        if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") {
-                t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect")
-        }
+       if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") {
+               t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect")
+       }
 
-        if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 201") {
-                t.Errorf("Error: ricxapp_SDL_Stored value incorrect")
-        }
+       if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 201") {
+               t.Errorf("Error: ricxapp_SDL_Stored value incorrect")
+       }
 
-        if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") {
-                t.Errorf("Error: ricxapp_SDL_StoreError value incorrect")
-        }
-        Rmr.Closewh(int(whid))
+       if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") {
+               t.Errorf("Error: ricxapp_SDL_StoreError value incorrect")
+       }
+       Rmr.Closewh(int(whid))
 }
 
 func TestSubscribeChannels(t *testing.T) {