From: Mohamed Abukar Date: Thu, 2 Apr 2020 07:08:14 +0000 (+0300) Subject: Support for subscription callback X-Git-Tag: v0.4.6^0 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=5953f7e372df54c71f526e9519e8eb0ee7ee6f72;p=ric-plt%2Fxapp-frame.git Support for subscription callback Change-Id: If35eb7b2ee3e4841dfdcb9345172aa337afaf140 Signed-off-by: Mohamed Abukar --- diff --git a/ci/Dockerfile b/ci/Dockerfile index 7b95438..8303114 100755 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -35,7 +35,7 @@ RUN apt-get update -y \ RUN curl -s https://packagecloud.io/install/repositories/o-ran-sc/master/script.deb.sh | bash # RMR -ARG RMRVERSION=3.6.0 +ARG RMRVERSION=3.6.5 #RUN apt-get install -y rmr=${RMRVERSION} rmr-dev=${RMRVERSION} RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb diff --git a/config/config-file.yaml b/config/config-file.yaml index 42236ec..6f163c3 100755 --- a/config/config-file.yaml +++ b/config/config-file.yaml @@ -26,6 +26,8 @@ "subscription": "host": "localhost:8088" "timeout": 2 + "retryCount": 10 + "retryDelay": 5 "db": "namespace": "sdl" "waitForSdl": false diff --git a/pkg/xapp/subscription.go b/pkg/xapp/subscription.go index 6ceabf9..cf8dc04 100755 --- a/pkg/xapp/subscription.go +++ b/pkg/xapp/subscription.go @@ -20,14 +20,17 @@ package xapp import ( + "bytes" "encoding/json" "fmt" "github.com/go-openapi/loads" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/runtime/middleware" "github.com/go-openapi/strfmt" + "github.com/spf13/viper" "io/ioutil" "net/http" + "strings" "time" apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi" @@ -48,6 +51,7 @@ import ( type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error) type SubscriptionQueryHandler func() (models.SubscriptionList, error) type SubscriptionDeleteHandler func(string) error +type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse) type Subscriber struct { localAddr string @@ -56,6 +60,8 @@ type Subscriber struct { remoteUrl string remoteProt []string timeout time.Duration + clientUrl string + clientCB SubscriptionResponseCallback } func NewSubscriber(host string, timo int) *Subscriber { @@ -67,18 +73,34 @@ func NewSubscriber(host string, timo int) *Subscriber { timo = 20 } - return &Subscriber{ + r := &Subscriber{ remoteHost: host, remoteUrl: "/ric/v1", remoteProt: []string{"http"}, timeout: time.Duration(timo) * time.Second, localAddr: "0.0.0.0", localPort: 8088, + clientUrl: "/ric/v1/subscriptions/response", + } + Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST") + + return r +} + +func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) { + if req.Body != nil { + var resp apimodel.SubscriptionResponse + if err := json.NewDecoder(req.Body).Decode(&resp); err == nil { + if r.clientCB != nil { + r.clientCB(&resp) + } + } + req.Body.Close() } } // Server interface: listen and receive subscription requests -func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error { +func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error { swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON) if err != nil { return err @@ -86,10 +108,10 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle api := operations.NewXappFrameworkAPI(swaggerSpec) - // Subscription: query + // Subscription: Query api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc( func(p query.GetAllSubscriptionsParams) middleware.Responder { - if resp, err := get(); err == nil { + if resp, err := getSubscription(); err == nil { return query.NewGetAllSubscriptionsOK().WithPayload(resp) } return query.NewGetAllSubscriptionsInternalServerError() @@ -98,25 +120,25 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle // SubscriptionType: Report api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc( func(p report.SubscribeReportParams) middleware.Responder { - if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil { + if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil { return report.NewSubscribeReportCreated().WithPayload(resp) } return report.NewSubscribeReportInternalServerError() }) - // SubscriptionType: policy + // SubscriptionType: Policy api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc( func(p policy.SubscribePolicyParams) middleware.Responder { - if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil { + if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil { return policy.NewSubscribePolicyCreated().WithPayload(resp) } return policy.NewSubscribePolicyInternalServerError() }) - // SubscriptionType: delete + // SubscriptionType: Delete api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc( func(p common.UnsubscribeParams) middleware.Responder { - if err := del(p.SubscriptionID); err == nil { + if err := delSubscription(p.SubscriptionID); err == nil { return common.NewUnsubscribeNoContent() } return common.NewUnsubscribeInternalServerError() @@ -134,6 +156,50 @@ func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandle return nil } +// Server interface: send notification to client +func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) { + respData, err := json.Marshal(resp) + if err != nil { + Logger.Error("json.Marshal failed: %v", err) + return err + } + + port := strings.Split(viper.GetString("local.host"), ":")[1] + clientUrl := fmt.Sprintf("http://%s:%s%s", clientEndpoint, port, r.clientUrl) + + retries := viper.GetInt("subscription.retryCount") + if retries == 0 { + retries = 10 + } + + delay := viper.GetInt("subscription.retryDelay") + if delay == 0 { + delay = 5 + } + + for i := 0; i < retries; i++ { + r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData)) + if err == nil && (r != nil && r.StatusCode == http.StatusOK) { + break + } + + if err != nil { + Logger.Error("%v", err) + } + if r != nil && r.StatusCode != http.StatusOK { + Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode) + } + time.Sleep(time.Duration(delay) * time.Second) + } + + return err +} + +// Subscription interface for xApp: Response callback +func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) { + r.clientCB = c +} + // Subscription interface for xApp: REPORT func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) { params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p) diff --git a/pkg/xapp/subscription_test.go b/pkg/xapp/subscription_test.go index b9713af..a5601cc 100755 --- a/pkg/xapp/subscription_test.go +++ b/pkg/xapp/subscription_test.go @@ -7,28 +7,29 @@ package xapp import ( - apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "github.com/stretchr/testify/assert" "testing" "time" + "fmt" ) var suite *testing.T var meid = "gnb123456" var funId = int64(1) -var clientEndpoint = "localhost:4561" +var clientEndpoint = "localhost" var direction = int64(0) var procedureCode = int64(27) var typeOfMessage = int64(1) -var reportParams = apimodel.ReportParams{ - Meid: meid, - RANFunctionID: &funId, +var reportParams = clientmodel.ReportParams{ + Meid: meid, + RANFunctionID: &funId, ClientEndpoint: &clientEndpoint, - EventTriggers: apimodel.EventTriggerList{ - &apimodel.EventTrigger{ + EventTriggers: clientmodel.EventTriggerList{ + &clientmodel.EventTrigger{ InterfaceDirection: direction, ProcedureCode: procedureCode, TypeOfMessage: typeOfMessage, @@ -36,18 +37,34 @@ var reportParams = apimodel.ReportParams{ }, } -var policyParams = apimodel.PolicyParams{ - Meid: &meid, - RANFunctionID: &funId, +var policyParams = clientmodel.PolicyParams{ + Meid: &meid, + RANFunctionID: &funId, ClientEndpoint: &clientEndpoint, - EventTriggers: apimodel.EventTriggerList{ - &apimodel.EventTrigger{ + EventTriggers: clientmodel.EventTriggerList{ + &clientmodel.EventTrigger{ InterfaceDirection: direction, ProcedureCode: procedureCode, TypeOfMessage: typeOfMessage, }, }, - PolicyActionDefinitions: &apimodel.PolicyActionDefinition{}, + PolicyActionDefinitions: &clientmodel.PolicyActionDefinition{}, +} + +func processSubscriptions(subscriptionId string) { + // Generate requestorId, instanceId + reqId := int64(11) + instanceId := int64(22) + + resp := &models.SubscriptionResponse{ + SubscriptionID: &subscriptionId, + SubscriptionInstances: []*models.SubscriptionInstance{ + &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId}, + }, + } + + // Notify the client: don't worry about errors ... Notify() will handle retries, etc. + Subscription.Notify(resp, clientEndpoint) } func subscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) { @@ -65,15 +82,15 @@ func subscriptionHandler(stype models.SubscriptionType, params interface{}) (*mo assert.Equal(suite, clientEndpoint, *p.ClientEndpoint) } - subId := "xapp-11" - reqId := int64(11) - instanceId := int64(22) + // Generate a unique subscriptionId + subscriptionId := fmt.Sprintf("%s-%s", meid, clientEndpoint) + + // Process subscriptions on the background + go processSubscriptions(subscriptionId) + + // and send response immediately return &models.SubscriptionResponse{ - SubscriptionID: &subId, - SubscriptionInstances: []*models.SubscriptionInstance{ - &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId}, - &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId}, - }, + SubscriptionID: &subscriptionId, }, nil } @@ -113,27 +130,29 @@ func TestSubscriptionQueryHandling(t *testing.T) { } func TestSubscriptionReportHandling(t *testing.T) { - resp, err := Subscription.SubscribeReport(&reportParams) + Subscription.SetResponseCB(func(resp *clientmodel.SubscriptionResponse) { + assert.Equal(t, len(resp.SubscriptionInstances), 1) + assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11)) + assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22)) + }) + _, err := Subscription.SubscribeReport(&reportParams) assert.Equal(t, err, nil) - assert.Equal(t, len(resp.SubscriptionInstances), 2) - assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11)) - assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22)) - assert.Equal(t, *resp.SubscriptionInstances[1].RequestorID, int64(11)) - assert.Equal(t, *resp.SubscriptionInstances[1].InstanceID, int64(22)) } func TestSubscriptionPolicytHandling(t *testing.T) { - resp, err := Subscription.SubscribePolicy(&policyParams) + Subscription.SetResponseCB(func(resp *clientmodel.SubscriptionResponse) { + assert.Equal(t, len(resp.SubscriptionInstances), 1) + assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11)) + assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22)) + }) + _, err := Subscription.SubscribePolicy(&policyParams) assert.Equal(t, err, nil) - assert.Equal(t, len(resp.SubscriptionInstances), 2) - assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11)) - assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22)) } func TestSubscriptionDeleteHandling(t *testing.T) { err := Subscription.UnSubscribe(clientEndpoint) assert.Equal(t, err, nil) -} \ No newline at end of file +} diff --git a/pkg/xapp/xapp_test.go b/pkg/xapp/xapp_test.go index e52c3ee..9696e6e 100755 --- a/pkg/xapp/xapp_test.go +++ b/pkg/xapp/xapp_test.go @@ -190,44 +190,44 @@ func TestMessagesReceivedSuccessfullyUsingWh(t *testing.T) { } func TestMessagesReceivedSuccessfullyUsingWhCall(t *testing.T) { - time.Sleep(time.Duration(5) * time.Second) - whid := Rmr.Openwh("localhost:4560") - params := &RMRParams{} - params.Payload = []byte("newrt|start\nnewrt|end\n") - params.Whid = int(whid) - params.Callid = 4 - params.Timeout = 1000 - Rmr.SendCallMsg(params) + time.Sleep(time.Duration(5) * time.Second) + whid := Rmr.Openwh("localhost:4560") + params := &RMRParams{} + params.Payload = []byte("newrt|start\nnewrt|end\n") + params.Whid = int(whid) + params.Callid = 4 + params.Timeout = 1000 + Rmr.SendCallMsg(params) // Allow time to process the messages time.Sleep(time.Duration(2) * time.Second) - waitForSdl := viper.GetBool("db.waitForSdl") - stats := getMetrics(t) - if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") { - t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats) - } + waitForSdl := viper.GetBool("db.waitForSdl") + stats := getMetrics(t) + if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") { + t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats) + } - if !strings.Contains(stats, "ricxapp_RMR_Received 201") { - t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats) - } + if !strings.Contains(stats, "ricxapp_RMR_Received 201") { + t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats) + } - if !strings.Contains(stats, "ricxapp_RMR_TransmitError 1") { - t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect") - } + if !strings.Contains(stats, "ricxapp_RMR_TransmitError 1") { + t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect") + } - if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") { - t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect") - } + if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") { + t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect") + } - if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 201") { - t.Errorf("Error: ricxapp_SDL_Stored value incorrect") - } + if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 201") { + t.Errorf("Error: ricxapp_SDL_Stored value incorrect") + } - if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") { - t.Errorf("Error: ricxapp_SDL_StoreError value incorrect") - } - Rmr.Closewh(int(whid)) + if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") { + t.Errorf("Error: ricxapp_SDL_StoreError value incorrect") + } + Rmr.Closewh(int(whid)) } func TestSubscribeChannels(t *testing.T) {