From c92b421ec9f89e77df36422987e478ed8db85299 Mon Sep 17 00:00:00 2001 From: Anssi Mannila Date: Mon, 7 Dec 2020 14:59:34 +0200 Subject: [PATCH] Submgr restart improvement Change-Id: I02388dea48b826eac09aec0308d85bcdcf812a6f Signed-off-by: Anssi Mannila --- Dockerfile | 2 +- config/submgr-config.yaml | 9 +- container-tag.yaml | 2 +- docs/user-guide.rst | 110 +++++++- e2ap/pkg/e2ap/msg_e2ap_subscription.go | 4 +- go.mod | 6 + go.sum | 10 + pkg/control/control.go | 249 +++++++++++++++--- pkg/control/registry.go | 53 +++- pkg/control/sdl.go | 229 ++++++++++++++++ pkg/control/sdl_test.go | 459 +++++++++++++++++++++++++++++++++ pkg/control/subscription.go | 26 +- pkg/control/types.go | 8 + pkg/control/ut_ctrl_submgr_test.go | 57 ++++ pkg/control/ut_messaging_test.go | 245 ++++++++++++++++++ test/config-file.json | 3 +- 16 files changed, 1402 insertions(+), 70 deletions(-) create mode 100644 pkg/control/sdl.go create mode 100644 pkg/control/sdl_test.go diff --git a/Dockerfile b/Dockerfile index 90eac73..4b41bd4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ ########################################################### # ########################################################### -FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:9-u18.04 as submgrcore +FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as submgrcore RUN apt update && apt install -y iputils-ping net-tools curl tcpdump gdb valgrind diff --git a/config/submgr-config.yaml b/config/submgr-config.yaml index a594ab4..25eb868 100644 --- a/config/submgr-config.yaml +++ b/config/submgr-config.yaml @@ -10,9 +10,16 @@ "hostAddr": "localhost" "port" : "8989" "baseUrl" : "/ric/v1" +"db": + "sessionNamespace": "XMSession" + "host": ":6379" + "prot": "tcp" + "maxIdle": 80 + "maxActive": 12000 "controls": "e2tSubReqTimeout_ms": 2000 "e2tSubDelReqTime_ms": 2000 "e2tRecvMsgTimeout_ms": 2000 "e2tMaxSubReqTryCount": 2 - "e2tMaxSubDelReqTryCount": 2 \ No newline at end of file + "e2tMaxSubDelReqTryCount": 2 + "readSubsFromDb": "true" diff --git a/container-tag.yaml b/container-tag.yaml index 0b276f1..39fb375 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: "0.5.8" +tag: "0.5.9" diff --git a/docs/user-guide.rst b/docs/user-guide.rst index 28d2e79..3247860 100755 --- a/docs/user-guide.rst +++ b/docs/user-guide.rst @@ -112,13 +112,19 @@ Architecture :width: 600 :alt: Subscription failure picture - * Timeout case + * Timeout in Subscription Manager In case of timeout in Subscription Manager, Subscription Manager may resend the RIC Subscription Request to RAN. If there is no response after retry, Subscription Manager shall NOT send any response to xApp. xApp may retry RIC Subscription Request, if it wishes to do so. Subscription Manager does no handle the retry if Subscription Manager has ongoing subscription procedure for the same subscription. Subscription just drops the request. + * Timeout in xApp + + xApp may resend the same request if there is no response in expected time. If xApp resends the same request while processing of previous + request has not been completed in Subscription Manager then Subscription Manager drops the new request, makes a log writing and continues + processing previous request. + .. image:: images/Subscription_Timeout.png :width: 600 :alt: Subscription timeout picture @@ -146,11 +152,17 @@ Architecture :width: 600 :alt: Subscription delete failure picture - * Timeout case + * Timeout in Subscription Manager In case of timeout in Subscription Manager, Subscription Manager may resend the RIC Subscription Delete Request to RAN. If there is no response after retry, Subscription Manager responds to xApp with RIC Subscription Delete Response. + * Timeout in xApp + + xApp may resend the same request if there is no response in expected time. If xApp resends the same request while processing of previous + request has not been completed in Subscription Manager then Subscription Manager drops the new request, makes a log writing and continues + processing previous request. + .. image:: images/Subscription_Delete_Timeout.png :width: 600 :alt: Subscription delete timeout picture @@ -183,11 +195,17 @@ Architecture Failure case is basically the same as in normal subscription procedure. Failure can come only from RAN when merge is not yet done. If error happens during route create Subscription Manager drops the RIC Subscription Request message and xApp does not get any response. - * Timeout case + * Timeout in Subscription Manager Timeout case is basically the same as in normal subscription procedure but timeout can come only in route create during merge operation. If error happens during route create, Subscription Manager drops the RIC Subscription Request message and xApp does not get any response. + * Timeout in xApp + + xApp may resend the same request if there is no response in expected time. If xApp resends the same request while processing of previous + request has not been completed in Subscription Manager then Subscription Manager drops the new request, makes a log writing and continues + processing previous request. + * Subscription delete merge procedure * Successful case @@ -205,19 +223,91 @@ Architecture Delete procedure cannot fail from xApp point of view. Subscription Manager responds with RIC Subscription Delete Response message to xApp. - * Timeout case + * Timeout in Subscription Manager Timeout can only happen in route delete to Routing manager. Subscription Manager responds with RIC Subscription Delete Response message to xApp. + * Timeout in xApp + + xApp may resend the same request if there is no response in expected time. If xApp resends the same request while processing of previous + request has not been completed in Subscription Manager then Subscription Manager drops the new request, makes a log writing and continues + processing previous request. + * Unknown message If Subscription Manager receives unknown message, Subscription Manager drops the message. * xApp restart - When xApp is restarted for any reason it may resend the subscriptions which have already been subscribed. In this case Subscription Manager sends - successful response to such requests without updating Routing Manager and BTS. In restart IP address of the xApp may change but service address name - does not. Message routing uses service address name. + When xApp is restarted for any reason it may resend subscription requests for subscriptions which have already been subscribed. If REPORT or INSERT type + subscription already exists and RMR endpoint of requesting xApp is attached to subscription then successful response is sent to xApp directly without + updating Routing Manager and BTS. If POLICY type subscription already exists, request is forwarded to BTS and successful response is sent to xApp. + BTS is expected to accept duplicate POLICY type requests. In restart IP address of the xApp may change but domain service address name does not. + RMR message routing uses domain service address name. + + * Subscription Manager restart + + Subscription Manager stores successfully described subscriptions from db (SDL). Subscriptions are restored from db in restart. For subscriptions which + were not successfully completed, Subscription Manager sends delete request to BTS and removes routes created for those. Restoring subscriptions from + db can be disable via submgr-config.yaml file by setting "readSubsFromDb": "false". + +REST interface for debugging and testing +---------------------------------------- + Give following commands to get Subscription Manager pod's IP address + + .. code-block:: none + + kubectl get pods -A | grep submgr + + ricplt submgr-75bccb84b6-n9vnt 1/1 Running 0 81m + + Syntax: kubectl exec -t -n ricplt -- cat /etc/hosts | grep submgr | awk '{print $1}' + + Example: kubectl exec -t -n ricplt submgr-75bccb84b6-n9vnt -- cat /etc/hosts | grep submgr | awk '{print $1}' + + 10.244.0.181 + + Get subscriptions + + .. code-block:: none + + Example: curl -X GET "http://10.244.0.181:8088/ric/v1/subscriptions" + + [] + + Delete single subscription from db + + .. code-block:: none + + Syntax: curl -X POST "http://10.244.0.181:8080/ric/v1/test/deletesubid={SubscriptionId}" + + Example: curl -X POST "http://10.244.0.181:8080/ric/v1/test/deletesubid=1" + + Remove all subscriptions from db + + .. code-block:: none + + Example: curl -X POST "http://10.244.0.181:8080/ric/v1/test/emptydb" + + Make Subscription Manager restart + + .. code-block:: none + + Example: curl -X POST "http://10.244.0.181:8080/ric/v1/test/restart" + + Use this command to get Subscription Manager's log writings + + .. code-block:: none + + Example: kubectl logs -n ricplt submgr-75bccb84b6-n9vnt + + Logger level in configmap.yaml file in Helm chart is by default 2. It means that only info logs are printed. + To see debug log writings it has to be changed to 4. + + .. code-block:: none + + "logger": + "level": 4 RAN services explained ---------------------- @@ -243,6 +333,8 @@ Supported E2 procedures and RAN services Recommendations for xApps ------------------------- - * Recommended retry delay + * Recommended retry delay in xApp - Recommended retry delay for xApp is >= 5 seconds + Recommended retry delay for xApp is >= 5 seconds. Length of supervising timers in Subscription Manager for the requests it sends to BTS is by default 2 + seconds. Subscription Manager makes one retry by default. There can be only one ongoing request towards per RAN other requests are queued in Subscription + Manager. diff --git a/e2ap/pkg/e2ap/msg_e2ap_subscription.go b/e2ap/pkg/e2ap/msg_e2ap_subscription.go index 7a1f9e0..8c6910e 100644 --- a/e2ap/pkg/e2ap/msg_e2ap_subscription.go +++ b/e2ap/pkg/e2ap/msg_e2ap_subscription.go @@ -35,8 +35,8 @@ type E2APSubscriptionRequest struct { type E2APSubscriptionResponse struct { RequestId FunctionId - ActionAdmittedList - ActionNotAdmittedList + ActionAdmittedList ActionAdmittedList // SDL JSON encode requires this format (name data-type) to work correctly + ActionNotAdmittedList ActionNotAdmittedList // SDL JSON encode requires this format (name data-type) to work correctly } //----------------------------------------------------------------------------- diff --git a/go.mod b/go.mod index 893caca..41951fc 100644 --- a/go.mod +++ b/go.mod @@ -6,18 +6,24 @@ replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo => gerrit.o-ran-sc.org/r/ric-plt/sdl replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.5.12 +//replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.6.7 + replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.1 +//replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.2 + replace gerrit.o-ran-sc.org/r/ric-plt/e2ap => ./e2ap/ require ( gerrit.o-ran-sc.org/r/ric-plt/e2ap v0.0.0-00010101000000-000000000000 + gerrit.o-ran-sc.org/r/ric-plt/sdlgo v0.5.0 gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.0-00010101000000-000000000000 github.com/go-openapi/errors v0.19.3 github.com/go-openapi/runtime v0.19.4 github.com/go-openapi/strfmt v0.19.4 github.com/go-openapi/swag v0.19.7 github.com/go-openapi/validate v0.19.6 + github.com/gorilla/mux v1.7.1 github.com/spf13/viper v1.4.0 github.com/stretchr/testify v1.5.1 ) diff --git a/go.sum b/go.sum index 844d9ec..501ad6d 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= gerrit.o-ran-sc.org/r/com/golog.git v0.0.1 h1:9RfO/Whehaaq5KiJTT7s+YOzmi9mT1C3HktfhwwMEmw= gerrit.o-ran-sc.org/r/com/golog.git v0.0.1/go.mod h1:b8YB31U8/4iRpABioeSzGi/YMzOQ/Zq7hrJmmXKqlJk= +gerrit.o-ran-sc.org/r/com/golog.git v0.0.2 h1:Ix6SgFuzd6yW6Ur6+qDlGhDO65UYs8PiIkeAL1VaQ2o= +gerrit.o-ran-sc.org/r/com/golog.git v0.0.2/go.mod h1:A7hUL52YQSO4dFIZNcj76XQ09C9PftAe3LyL7kqBnok= gerrit.o-ran-sc.org/r/ric-plt/alarm-go.git/alarm v0.4.2 h1:XNfkp3PwZ7pfkPszX7NaX6DzToCGjcWTLbIHYqCFNu0= gerrit.o-ran-sc.org/r/ric-plt/alarm-go.git/alarm v0.4.2/go.mod h1:AdEWKtERGvOQy9ybLhyhrb9w9LLVn8i9xzTwoR5n4BY= gerrit.o-ran-sc.org/r/ric-plt/nodeb-rnib.git/common v1.0.35 h1:TGXHb4DNY8on+ej4S9VUnk2HibIC/5chDy64OE+bQBQ= @@ -17,6 +19,8 @@ gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.4.18 h1:z/p6unCmbSAK2w8cwSPn1vIS gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.4.18/go.mod h1:bjhhEII07w+zPQzyvqTq84TjKQPa6IkcnSyWB1DfGHo= gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.5.12 h1:z5LWpefAw7JjKn3WSQ4+/f0PilIrr70qCTKwTi8R7MI= gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.5.12/go.mod h1:bjhhEII07w+zPQzyvqTq84TjKQPa6IkcnSyWB1DfGHo= +gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.6.7 h1:lTURRbfaV0kPcplGF7jyowRXoUkNknRzd7Y51ZPs5PI= +gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.6.7/go.mod h1:fpHeoGISAkz6bNfgZtq8Ycg9i9KdGgUBP2jLs6TGk2g= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -59,6 +63,7 @@ github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI= github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik= github.com/go-openapi/analysis v0.18.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik= @@ -203,6 +208,8 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= +github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= @@ -300,3 +307,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= +k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw= +k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/pkg/control/control.go b/pkg/control/control.go index 9f90252..7d180ad 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -21,14 +21,20 @@ package control import ( "fmt" + "net/http" + "os" + "strconv" + "strings" + "time" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" + "github.com/gorilla/mux" "github.com/spf13/viper" - "time" ) //----------------------------------------------------------------------------- @@ -59,14 +65,17 @@ var e2tSubDelReqTime time.Duration var e2tRecvMsgTimeout time.Duration var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry +var readSubsFromDb string type Control struct { *xapp.RMRClient e2ap *E2ap registry *Registry tracker *Tracker + db Sdlnterface //subscriber *xapp.Subscriber - CntRecvMsg uint64 + CntRecvMsg uint64 + ResetTestFlag bool } type RMRMeid struct { @@ -75,6 +84,9 @@ type RMRMeid struct { RanName string } +type SubmgrRestartTestEvent struct{} +type SubmgrRestartUpEvent struct{} + func init() { xapp.Logger.Info("SUBMGR") viper.AutomaticEnv() @@ -84,6 +96,54 @@ func init() { func NewControl() *Control { + ReadConfigParameters() + transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) + rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)} + + registry := new(Registry) + registry.Initialize() + registry.rtmgrClient = &rtmgrClient + + tracker := new(Tracker) + tracker.Init() + + //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) + + c := &Control{e2ap: new(E2ap), + registry: registry, + tracker: tracker, + db: CreateSdl(), + //subscriber: subscriber, + } + + // Register REST handler for testing support + xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST") + + go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler) + //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) + + if readSubsFromDb == "false" { + return c + } + + // Read subscriptions from db + xapp.Logger.Info("Reading subscriptions from db") + subIds, register, err := c.ReadAllSubscriptionsFromSdl() + if err != nil { + xapp.Logger.Error("%v", err) + } else { + c.registry.subIds = subIds + c.registry.register = register + c.HandleUncompletedSubscriptions(register) + } + return c +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func ReadConfigParameters() { + // viper.GetDuration returns nanoseconds e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000 if e2tSubReqTimeout == 0 { @@ -111,26 +171,26 @@ func NewControl() *Control { } xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount) - transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) - rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)} - - registry := new(Registry) - registry.Initialize() - registry.rtmgrClient = &rtmgrClient - - tracker := new(Tracker) - tracker.Init() - - //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) + readSubsFromDb = viper.GetString("controls.readSubsFromDb") + if readSubsFromDb == "" { + readSubsFromDb = "true" + } + xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb) +} - c := &Control{e2ap: new(E2ap), - registry: registry, - tracker: tracker, - //subscriber: subscriber, +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) { + + xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register)) + for subId, subs := range register { + if subs.SubRespRcvd == false { + subs.NoRespToXapp = true + xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId) + c.SendSubscriptionDeleteReq(subs) + } } - go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler) - //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) - return c } func (c *Control) ReadyCB(data interface{}) { @@ -164,14 +224,47 @@ func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params inte return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented") } -func (c *Control) SubscriptionDeleteHandler(string) error { - return fmt.Errorf("Subscription rest interface not implemented") +func (c *Control) SubscriptionDeleteHandler(s string) error { + return nil } func (c *Control) QueryHandler() (models.SubscriptionList, error) { return c.registry.QueryHandler() } +func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) { + + xapp.Logger.Info("TestRestHandler() called") + + pathParams := mux.Vars(r) + s := pathParams["testId"] + + // This can be used to delete single subscription from db + if contains := strings.Contains(s, "deletesubid="); contains == true { + var splits = strings.Split(s, "=") + if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil { + xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId) + c.RemoveSubscriptionFromSdl(uint32(subId)) + return + } + } + + // This can be used to remove all subscriptions db from + if s == "emptydb" { + xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called") + c.RemoveAllSubscriptionsFromSdl() + return + } + + // This is meant to cause submgr's restart in testing + if s == "restart" { + xapp.Logger.Info("os.Exit(1) called") + os.Exit(1) + } + + xapp.Logger.Info("Unsupported rest command received %s", s) +} + //------------------------------------------------------------------- // //------------------------------------------------------------------- @@ -270,7 +363,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { } //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it? - subs, err := c.registry.AssignToSubscription(trans, subReqMsg) + subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag) if err != nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) return @@ -281,7 +374,6 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { // go c.handleSubscriptionCreate(subs, trans) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side - err = nil if event != nil { switch themsg := event.(type) { @@ -344,7 +436,12 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) - // Whatever is received send ok delete response + if subs.NoRespToXapp == true { + // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions + return + } + + // Whatever is received success, fail or timeout, send successful delete response subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{} subDelRespMsg.RequestId = subs.GetReqId().RequestId subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId @@ -362,6 +459,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { //------------------------------------------------------------------- func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) { + var removeSubscriptionFromDb bool = false trans := c.tracker.NewSubsTransaction(subs) subs.WaitTransactionTurn(trans) defer subs.ReleaseTransactionTurn(trans) @@ -386,7 +484,9 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: subRfMsg, valid = subs.SetCachedResponse(event, true) + subs.SubRespRcvd = true case *e2ap.E2APSubscriptionFailure: + removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(event, false) doRetry = true for _, item := range themsg.ActionNotAdmittedList.Items { @@ -397,13 +497,17 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran } xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans)) c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) + case *SubmgrRestartTestEvent: + // This simulates that no response has been received and after restart subscriptions are restored from db + xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case") + return default: xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) + removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(nil, false) c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) } } - xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } else { xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) @@ -411,8 +515,10 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) if valid == false { - c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) + c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c) } + + c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb) parentTrans.SendEvent(subRfMsg, 0) } @@ -440,7 +546,8 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...)) - c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) + c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c) + c.registry.UpdateSubscriptionToDb(subs, c) parentTrans.SendEvent(nil, 0) } @@ -460,12 +567,19 @@ func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transact return event } + // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart + c.WriteSubscriptionToDb(subs) for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ { desc := fmt.Sprintf("(retry %d)", retries) c.rmrSendToE2T(desc, subs, trans) - event, timedOut = trans.WaitEvent(e2tSubReqTimeout) - if timedOut { - continue + if subs.DoNotWaitSubResp == false { + event, timedOut = trans.WaitEvent(e2tSubReqTimeout) + if timedOut { + continue + } + } else { + // Simulating case where subscrition request has been sent but response has not been received before restart + event = &SubmgrRestartTestEvent{} } break } @@ -644,3 +758,76 @@ func typeofSubsMessage(v interface{}) string { return "Unknown" } } + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) WriteSubscriptionToDb(subs *Subscription) { + xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId) + err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs) + if err != nil { + xapp.Logger.Error("%v", err) + } +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) { + + if removeSubscriptionFromDb == true { + // Subscription was written in db already when subscription request was sent to BTS, except for merged request + c.RemoveSubscriptionFromDb(subs) + } else { + // Update is needed for successful response and merge case here + if subs.RetryFromXapp == false { + c.WriteSubscriptionToDb(subs) + } + } + subs.RetryFromXapp = false +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) { + xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId) + err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId) + if err != nil { + xapp.Logger.Error("%v", err) + } +} + +func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { + + xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId) + + // Send delete for every endpoint in the subscription + subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{} + subDelReqMsg.RequestId = subs.GetReqId().RequestId + subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId + mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg) + if err != nil { + xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err)) + return + } + for _, endPoint := range subs.EpList.Endpoints { + params := &xapp.RMRParams{} + params.Mtype = mType + params.SubId = int(subs.GetReqId().InstanceId) + params.Xid = "" + params.Meid = subs.Meid + params.Src = endPoint.String() + params.PayloadLen = len(payload.Buf) + params.Payload = payload.Buf + params.Mbuf = nil + + if params == nil { + xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil") + return + } + + subs.DeleteFromDb = true + c.handleXAPPSubscriptionDeleteRequest(params) + } +} diff --git a/pkg/control/registry.go b/pkg/control/registry.go index 7ad54c1..e07116c 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -42,8 +42,8 @@ type Registry struct { func (r *Registry) Initialize() { r.register = make(map[uint32]*Subscription) var i uint32 - for i = 0; i < 65535; i++ { - r.subIds = append(r.subIds, i+1) + for i = 1; i < 65535; i++ { + r.subIds = append(r.subIds, i) } } @@ -60,7 +60,7 @@ func (r *Registry) QueryHandler() (models.SubscriptionList, error) { return resp, nil } -func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { +func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) { if len(r.subIds) > 0 { subId := r.subIds[0] r.subIds = r.subIds[1:] @@ -69,19 +69,26 @@ func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubs return nil, fmt.Errorf("Registry: Failed to reserve subscription exists") } subs := &Subscription{ - registry: r, - Meid: trans.Meid, - SubReqMsg: subReqMsg, - valid: true, + registry: r, + Meid: trans.Meid, + SubReqMsg: subReqMsg, + valid: true, + RetryFromXapp: false, + SubRespRcvd: false, + DeleteFromDb: false, + NoRespToXapp: false, + DoNotWaitSubResp: false, } subs.ReqId.Id = 123 subs.ReqId.InstanceId = subId + if resetTestFlag == true { + subs.DoNotWaitSubResp = true + } if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false { r.subIds = append(r.subIds, subs.ReqId.InstanceId) return nil, fmt.Errorf("Registry: Endpoint existing already in subscription") } - return subs, nil } return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids") @@ -121,7 +128,7 @@ func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2AP return nil, false } -func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { +func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) { var err error var newAlloc bool r.mutex.Lock() @@ -151,15 +158,16 @@ func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap. subs, endPointFound := r.findExistingSubs(trans, subReqMsg) if subs == nil { - subs, err = r.allocateSubs(trans, subReqMsg) + subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag) if err != nil { return nil, err } newAlloc = true } else if endPointFound == true { // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted. + subs.RetryFromXapp = true xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String()) - xapp.Logger.Debug("Registry: substable=%v", r.register) + //xapp.Logger.Debug("Registry: substable=%v", r.register) return subs, nil } @@ -231,7 +239,7 @@ func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (ui } // TODO: Works with concurrent calls, but check if can be improved -func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error { +func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error { r.mutex.Lock() defer r.mutex.Unlock() @@ -276,7 +284,6 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction xapp.Logger.Debug("Registry: substable=%v", r.register) } r.subIds = append(r.subIds, subId) - } else if subs.EpList.Size() > 0 { // // Subscription route updates @@ -284,12 +291,30 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)} r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) } - }() return nil } +func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) { + r.mutex.Lock() + defer r.mutex.Unlock() + subs.mutex.Lock() + defer subs.mutex.Unlock() + + epamount := subs.EpList.Size() + if epamount == 0 { + if _, ok := r.register[subs.ReqId.InstanceId]; ok { + // Not merged subscription is being deleted + c.RemoveSubscriptionFromDb(subs) + + } + } else if subs.EpList.Size() > 0 { + // Endpoint of merged subscription is being deleted + c.WriteSubscriptionToDb(subs) + } +} + func (r *Registry) GetSubscription(subId uint32) *Subscription { r.mutex.Lock() defer r.mutex.Unlock() diff --git a/pkg/control/sdl.go b/pkg/control/sdl.go new file mode 100644 index 0000000..3a083fb --- /dev/null +++ b/pkg/control/sdl.go @@ -0,0 +1,229 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package control + +import ( + "encoding/json" + "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "strconv" +) + +type SubscriptionInfo struct { + Valid bool + ReqId RequestId + Meid xapp.RMRMeid + EpList xapp.RmrEndpointList + SubReqMsg e2ap.E2APSubscriptionRequest + SubRespMsg e2ap.E2APSubscriptionResponse + SubFailMsg e2ap.E2APSubscriptionFailure + SubRespRcvd string +} + +func CreateSdl() Sdlnterface { + return sdl.NewSdlInstance("submgr", sdl.NewDatabase()) +} + +func (c *Control) WriteSubscriptionToSdl(subId uint32, subs *Subscription) error { + + var subscriptionInfo SubscriptionInfo + subscriptionInfo.Valid = subs.valid + subscriptionInfo.ReqId = subs.ReqId + subscriptionInfo.Meid = *subs.Meid + subscriptionInfo.EpList = subs.EpList + subscriptionInfo.SubReqMsg = *subs.SubReqMsg + + if typeofSubsMessage(subs.SubRFMsg) == "SubResp" { + subscriptionInfo.SubRespRcvd = "SubResp" + subscriptionInfo.SubRespMsg = *subs.SubRFMsg.(*e2ap.E2APSubscriptionResponse) + } else if typeofSubsMessage(subs.SubRFMsg) == "SubFail" { + subscriptionInfo.SubRespRcvd = "SubFail" + subscriptionInfo.SubFailMsg = *subs.SubRFMsg.(*e2ap.E2APSubscriptionFailure) + } else { + subscriptionInfo.SubRespRcvd = "" + } + + jsonData, err := json.Marshal(subscriptionInfo) + if err != nil { + return fmt.Errorf("SDL: WriteSubscriptionToSdl() json.Marshal error: %s", err.Error()) + } + + err = c.db.Set(strconv.FormatUint(uint64(subId), 10), jsonData) + if err != nil { + return fmt.Errorf("SDL: WriteSubscriptionToSdl(): %s", err.Error()) + } else { + xapp.Logger.Debug("SDL: Subscription written in db. subId = %v", subId) + } + return nil +} + +func (c *Control) ReadSubscriptionFromSdl(subId uint32) (*Subscription, error) { + + // This function is now just for testing purpose + key := strconv.FormatUint(uint64(subId), 10) + retMap, err := c.db.Get([]string{key}) + if err != nil { + return nil, fmt.Errorf("SDL: ReadSubscriptionFromSdl(): %s", err.Error()) + } else { + xapp.Logger.Debug("SDL: Subscription read from db. subId = %v", subId) + } + + subs := &Subscription{} + for _, iSubscriptionInfo := range retMap { + + if iSubscriptionInfo == nil { + return nil, fmt.Errorf("SDL: ReadSubscriptionFromSdl() subscription not found. subId = %v\n", subId) + } + + subscriptionInfo := &SubscriptionInfo{} + jsonSubscriptionInfo := iSubscriptionInfo.(string) + + err := json.Unmarshal([]byte(jsonSubscriptionInfo), subscriptionInfo) + if err != nil { + return nil, fmt.Errorf("SDL: ReadSubscriptionFromSdl() json.unmarshal error: %s\n", err.Error()) + } + + subs = c.CreateSubscription(subscriptionInfo, &jsonSubscriptionInfo) + } + return subs, nil +} + +func (c *Control) CreateSubscription(subscriptionInfo *SubscriptionInfo, jsonSubscriptionInfo *string) *Subscription { + + subs := &Subscription{} + subs.registry = c.registry + subs.valid = subscriptionInfo.Valid + subs.ReqId = subscriptionInfo.ReqId + meid := xapp.RMRMeid{} + meid = subscriptionInfo.Meid + subs.Meid = &meid + subs.EpList = subscriptionInfo.EpList + subs.TheTrans = nil + subReq := e2ap.E2APSubscriptionRequest{} + subReq = subscriptionInfo.SubReqMsg + subs.SubReqMsg = &subReq + + if subscriptionInfo.SubRespRcvd == "SubResp" { + subs.SubRespRcvd = true + subResp := e2ap.E2APSubscriptionResponse{} + subResp = subscriptionInfo.SubRespMsg + subs.SubRFMsg = &subResp + } else if subscriptionInfo.SubRespRcvd == "SubFail" { + subs.SubRespRcvd = false + subFail := e2ap.E2APSubscriptionFailure{} + subFail = subscriptionInfo.SubFailMsg + subs.SubRFMsg = &subFail + } else { + subs.SubRespRcvd = false + subs.SubRFMsg = nil + xapp.Logger.Debug("SDL: CreateSubscription() subscriptionInfo.SubRespRcvd == '', InstanceId=%v ", subscriptionInfo.ReqId.InstanceId) + } + return subs +} + +func (c *Control) RemoveSubscriptionFromSdl(subId uint32) error { + + key := strconv.FormatUint(uint64(subId), 10) + err := c.db.Remove([]string{key}) + if err != nil { + return fmt.Errorf("SDL: RemoveSubscriptionfromSdl(): %s\n", err.Error()) + } else { + xapp.Logger.Debug("SDL: Subscription removed from db. subId = %v", subId) + } + return nil +} + +func (c *Control) ReadAllSubscriptionsFromSdl() ([]uint32, map[uint32]*Subscription, error) { + + // Read all subscriptionInfos + var subIds []uint32 + var i uint32 + for i = 1; i < 65535; i++ { + subIds = append(subIds, i) + } + + retMap := make(map[uint32]*Subscription) + // Get all keys + keys, err := c.db.GetAll() + if err != nil { + return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl(), GetAll(). Error while reading keys from DBAAS %s\n", err.Error()) + } + + if len(keys) == 0 { + return subIds, retMap, nil + } + + // Get all subscriptionInfos + iSubscriptionMap, err := c.db.Get(keys) + if err != nil { + return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl(), Get(): Error while reading subscriptions from DBAAS %s\n", err.Error()) + } + + for _, iSubscriptionInfo := range iSubscriptionMap { + + if iSubscriptionInfo == nil { + return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() iSubscriptionInfo = nil\n") + } + + subscriptionInfo := &SubscriptionInfo{} + jsonSubscriptionInfo := iSubscriptionInfo.(string) + + err := json.Unmarshal([]byte(jsonSubscriptionInfo), subscriptionInfo) + if err != nil { + return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() json.unmarshal error: %s\n", err.Error()) + } + + subs := c.CreateSubscription(subscriptionInfo, &jsonSubscriptionInfo) + + if int(subscriptionInfo.ReqId.InstanceId) >= len(subIds) { + return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() index is out of range. Index is %d with slice length %d", subscriptionInfo.ReqId.InstanceId, len(subIds)) + } + retMap[subscriptionInfo.ReqId.InstanceId] = subs + + // Remove subId from free subIds. Original slice is modified here! + subIds, err = removeNumber(subIds, subscriptionInfo.ReqId.InstanceId) + if err != nil { + return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() error: %s\n", err.Error()) + } + } + return subIds, retMap, nil +} + +func removeNumber(s []uint32, removedNum uint32) ([]uint32, error) { + for i, num := range s { + if removedNum == uint32(num) { + s = append(s[:i], s[i+1:]...) + return s[:len(s)], nil + } + } + return nil, fmt.Errorf("SDL: To be removed number not in the slice. removedNum: %v", removedNum) +} +func (c *Control) RemoveAllSubscriptionsFromSdl() error { + + err := c.db.RemoveAll() + if err != nil { + return fmt.Errorf("SDL: RemoveAllSubscriptionsFromSdl(): %s\n", err.Error()) + } else { + xapp.Logger.Debug("SDL: All subscriptions removed from db") + } + return nil +} diff --git a/pkg/control/sdl_test.go b/pkg/control/sdl_test.go new file mode 100644 index 0000000..9198e5c --- /dev/null +++ b/pkg/control/sdl_test.go @@ -0,0 +1,459 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package control + +import ( + "encoding/json" + "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststube2ap" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "github.com/stretchr/testify/assert" + "reflect" + "strconv" + "testing" + "time" +) + +const ( + subsResponse = 1 + subsFailure = 2 + noResponse = 3 +) + +type Mock struct { + subsDB map[string]string // Store information as a string like real db does. + register map[uint32]*Subscription + subIds []uint32 + lastAllocatedSubId uint32 +} + +var mock *Mock + +func CreateMock() *Mock { + fmt.Println("Test CreateMock()") + mock = new(Mock) + mock.ResetTestSettings() + return mock +} + +func (m *Mock) ResetTestSettings() { + m.subsDB = make(map[string]string) + m.register = make(map[uint32]*Subscription) + var i uint32 + for i = 1; i < 65535; i++ { + m.subIds = append(m.subIds, i) + } +} + +func (m *Mock) AllocNextSubId() uint32 { + m.lastAllocatedSubId = m.subIds[0] + return m.lastAllocatedSubId +} + +func TestWait(t *testing.T) { + // Wait to test settings to complete + <-time.After(1 * time.Second) +} + +func GetSubscription(t *testing.T, e2SubId uint32, responseType int, srcEndPoint, ranName string, xId string) *Subscription { + t.Log("TEST: Getting subscription") + + subs := &Subscription{} + + // Create unpacked e2SubReqMsg + subReqParams := &teststube2ap.E2StubSubsReqParams{} + subReqParams.Init() + + meid := xapp.RMRMeid{} + meid.RanName = ranName + + params := &xapp.RMRParams{} + params.Src = srcEndPoint + params.Xid = xId + params.Meid = &meid + + // Create xApp transaction + trans := mainCtrl.c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqParams.Req.RequestId.InstanceId, params.Meid) + if trans == nil { + t.Errorf("TEST: %s", idstring(fmt.Errorf("transaction not created"), params)) + return nil + } + + // Allocate E2 instanceId/subId + subReqParams.Req.RequestId.InstanceId = e2SubId + + subs.ReqId.Id = 123 + subs.ReqId.InstanceId = subReqParams.Req.RequestId.InstanceId + subs.Meid = &meid + subs.EpList.AddEndpoint(trans.GetEndpoint()) + subs.SubReqMsg = subReqParams.Req + // subs.SubRFMsg contains received/cached SubscriptionResponse or SubscriptionFailure, nil in no response received + if responseType == subsResponse { + subs.SubRFMsg = GetSubsResponse(t, subReqParams.Req) + subs.valid = true + } else if responseType == subsFailure { + subs.SubRFMsg = GetSubsFailure(t, subReqParams.Req) + subs.valid = false + } else if responseType == noResponse { + subs.SubRFMsg = nil + subs.valid = false + } + return subs +} + +func GetSubsResponse(t *testing.T, req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse { + t.Log("TEST: Getting ricSubscriptionResponse") + + // Create e2SubRespMsg + resp := &e2ap.E2APSubscriptionResponse{} + resp.RequestId.Id = 123 + resp.RequestId.InstanceId = req.RequestId.InstanceId + resp.FunctionId = req.FunctionId + + resp.ActionAdmittedList.Items = make([]e2ap.ActionAdmittedItem, len(req.ActionSetups)) + for index := int(0); index < len(req.ActionSetups); index++ { + resp.ActionAdmittedList.Items[index].ActionId = req.ActionSetups[index].ActionId + } + + for index := uint64(0); index < 1; index++ { + item := e2ap.ActionNotAdmittedItem{} + item.ActionId = index + item.Cause.Content = 1 + item.Cause.Value = 1 + resp.ActionNotAdmittedList.Items = append(resp.ActionNotAdmittedList.Items, item) + } + return resp +} + +func GetSubsFailure(t *testing.T, req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionFailure { + t.Log("TEST: Getting ricSubscriptionFailure") + + fail := &e2ap.E2APSubscriptionFailure{} + fail.RequestId.Id = req.RequestId.Id + fail.RequestId.InstanceId = req.RequestId.InstanceId + fail.FunctionId = req.FunctionId + return fail +} + +func PrintSubscriptionData(t *testing.T, subs *Subscription) { + t.Log("TEST: subscription data") + t.Logf("TEST: subs.mutex = %v", subs.mutex) + t.Logf("TEST: subs.ReqId.InstanceId = %v", subs.ReqId.InstanceId) + t.Logf("TEST: subs.ReqId.Id = %v", subs.ReqId.Id) + t.Logf("TEST: subs.EpList = %v", subs.EpList) + t.Logf("TEST: subs.Meid.RanName = %v", subs.Meid.RanName) + t.Logf("TEST: subs.SubReqMsg = %v", subs.SubReqMsg.String()) + t.Logf("TEST: subs.valid = %v", subs.valid) + + if subs.SubRFMsg != nil { + switch typeofSubsMessage(subs.SubRFMsg) { + case "SubResp": + t.Logf("TEST: subs.SubRFMsg == SubResp") + subResp := subs.SubRFMsg.(*e2ap.E2APSubscriptionResponse) + t.Logf("TEST: subResp = %+v", subResp) + case "SubFail": + t.Logf("TEST: subs.SubRFMsg == SubFail") + subFail := subs.SubRFMsg.(*e2ap.E2APSubscriptionFailure) + t.Logf("TEST: subFail = %+v", subFail) + } + } else { + t.Logf("TEST: subs.SubRFMsg == nil") + } +} + +func TestWriteSubscriptionToSdl(t *testing.T) { + t.Log("TestWriteSubscriptionToSdl") + + // Write one subscription + subId := mock.AllocNextSubId() + subs := GetSubscription(t, subId, subsResponse, "localhost:13560", "RAN_NAME_1", "123456") + PrintSubscriptionData(t, subs) + t.Logf("TEST: Writing subId = %v\n", subId) + err := mainCtrl.c.WriteSubscriptionToSdl(subId, subs) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } +} + +func TestReadSubscriptionFromSdl(t *testing.T) { + t.Log("TestReadSubscriptionFromSdl") + + subId := mock.lastAllocatedSubId + t.Logf("Reading subId = %v\n", subId) + subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + PrintSubscriptionData(t, subs) + assert.Equal(t, mock.register[subId].SubReqMsg, subs.SubReqMsg) +} + +func TestRemoveSubscriptionFromSdl(t *testing.T) { + t.Log("TestRemoveSubscriptionFromSdl") + + subId := mock.lastAllocatedSubId + err := mainCtrl.c.RemoveSubscriptionFromSdl(subId) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + delete(mock.register, subId) + mock.subIds = append(mock.subIds, subId) + t.Logf("TEST: subscription removed from db. subId = %v", subId) +} + +func TestReadNotExistingSubscriptionFromSdl(t *testing.T) { + t.Log("TestReadNotExistingSubscriptionFromSdl") + + var subId uint32 = 0 + subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId) + if err != nil { + t.Logf("TEST: subscription not found from db. subId = %v", subId) + return + } + t.Errorf("TEST: subscription read from db. %v", subs.String()) + PrintSubscriptionData(t, subs) +} + +func TestReadNotExistingSubscriptionFromSdl2(t *testing.T) { + t.Log("TestReadNotExistingSubscriptionFromSdl") + + var subId uint32 = 7 + subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId) + if err != nil { + t.Logf("TEST: subscription not found from db. subId = %v", subId) + return + } + t.Errorf("TEST: subscription read from db. %v", subs.String()) + PrintSubscriptionData(t, subs) +} + +func TestRemoveNotExistingSubscriptionFromSdl(t *testing.T) { + t.Log("TestRemoveNotExistingSubscriptionFromSdl") + + var subId uint32 = 0 + err := mainCtrl.c.RemoveSubscriptionFromSdl(subId) + if err != nil { + t.Logf("TEST: %s", err.Error()) + return + } + t.Logf("TEST: subscription removed from db. subId = %v", subId) +} + +func TestWriteSubscriptionsToSdl(t *testing.T) { + t.Log("TestWriteSubscriptionsToSdl") + + // Write 1st subscription + subId := mock.AllocNextSubId() + t.Logf("TEST: Writing subId = %v\n", subId) + subs := GetSubscription(t, subId, subsResponse, "localhost:13560", "RAN_NAME_1", "123456") + PrintSubscriptionData(t, subs) + err := mainCtrl.c.WriteSubscriptionToSdl(subId, subs) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + t.Logf("TEST: subscription written in db = %v", subs.String()) + + // Write 2nd subscription + subId = mock.AllocNextSubId() + t.Logf("TEST:Writing subId = %v\n", subId) + subs = GetSubscription(t, subId, subsFailure, "localhost:13560", "RAN_NAME_2", "123457") + PrintSubscriptionData(t, subs) + err = mainCtrl.c.WriteSubscriptionToSdl(subId, subs) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + t.Logf("TEST: subscription written in db = %v", subs.String()) + + // Write 3rd subscription + subId = mock.AllocNextSubId() + t.Logf("TEST:Writing subId = %v\n", subId) + subs = GetSubscription(t, subId, noResponse, "localhost:13560", "RAN_NAME_3", "123458") + PrintSubscriptionData(t, subs) + err = mainCtrl.c.WriteSubscriptionToSdl(subId, subs) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + t.Logf("TEST: subscription written in db = %v", subs.String()) +} + +func TestReadSubscriptionsFromSdl(t *testing.T) { + t.Log("TestReadSubscriptionsFromSdl") + + // Subscription with subId 1 was added and and removed above. Then subscriptions with subIds 2, 3 and 4 was added + // Db subscriptions should now contain subIDs 2, 3 and 4 + var subId uint32 + for subId = 2; subId <= 4; subId++ { + subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId) + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + PrintSubscriptionData(t, subs) + } +} + +func TestReadAllSubscriptionsFromSdl(t *testing.T) { + t.Log("TestReadAllSubscriptionsFromSdl") + + // This test cases simulates submgr restart. SubIds and subscriptions are restored from db + // after initializing mock.subIds and mock.register + // var err error + subIds, register, err := mainCtrl.c.ReadAllSubscriptionsFromSdl() + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + // for _, subs := range mock.register { + for _, subs := range register { + PrintSubscriptionData(t, subs) + } + // SubIds slices before and after restart can't be directly compared as original slice is not stored + // in the db. SubId values 1, 2, 3, 4 are already removed from the beginning of subIds slice above + // so far. Next free subId is 5 in the beginning of mock.subIds slice. The db contains now however only + // 3 subscriptions with subIds 2, 3 and 4, so only subId values 2, 3, 4 are removed from the returned + // subIds slice and there next free value is 1 + assert.Equal(t, uint32(0x1), subIds[0]) +} + +func TestRemoveAllSubscriptionsFromSdl(t *testing.T) { + t.Log("TestRemoveAllSubscriptionsFromSdl") + + err := mainCtrl.c.RemoveAllSubscriptionsFromSdl() + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + t.Log("TEST: All subscription removed from db") +} + +func TestReadAllSubscriptionsFromSdl2(t *testing.T) { + t.Log("TestReadAllSubscriptionsFromSdl2") + + // This test cases simulates submgr startup. SubIds and subscriptions are restored from empty db + // after initializing mock.subIds and mock.register + subIds, register, err := mainCtrl.c.ReadAllSubscriptionsFromSdl() + if err != nil { + t.Errorf("TEST: %s", err.Error()) + return + } + for _, subs := range mock.register { + PrintSubscriptionData(t, subs) + } + assert.Equal(t, len(subIds), 65534) + assert.Equal(t, len(register), 0) +} + +func (m *Mock) Set(pairs ...interface{}) error { + var key string + var val string + + for _, v := range pairs { + reflectType := reflect.TypeOf(v) + switch reflectType.Kind() { + case reflect.Slice: + val = fmt.Sprintf("%s", v.([]uint8)) + default: + switch v.(type) { + case string: + key = v.(string) + default: + return fmt.Errorf("Set() error: Unexpected type\n") + } + } + } + + if key != "" { + m.subsDB[key] = val + subId := m.subIds[0] + subscriptionInfo := &SubscriptionInfo{} + err := json.Unmarshal([]byte(val), subscriptionInfo) + if err != nil { + return fmt.Errorf("Set() json.unmarshal error: %s\n", err.Error()) + } + + subs := mainCtrl.c.CreateSubscription(subscriptionInfo, &val) + m.register[subId] = subs + m.subIds = m.subIds[1:] + } else { + return fmt.Errorf("Set() error: key == ''\n") + } + return nil +} + +func (m *Mock) Get(keys []string) (map[string]interface{}, error) { + retMap := make(map[string]interface{}) + if len(keys) == 0 { + return nil, fmt.Errorf("Get() error: len(key) == 0\n") + } + + for _, key := range keys { + if key != "" { + retMap[key] = m.subsDB[key] + } else { + return nil, fmt.Errorf("Get() error: key == ''\n") + } + } + return retMap, nil +} + +func (m *Mock) GetAll() ([]string, error) { + + keys := []string{} + for key, _ := range m.subsDB { + keys = append(keys, key) + } + return keys, nil +} + +func (m *Mock) Remove(keys []string) error { + if len(keys) == 0 { + return fmt.Errorf("Remove() error: len(key) == 0\n") + } + subId64, err := strconv.ParseUint(keys[0], 10, 64) + if err != nil { + return fmt.Errorf("Remove() ParseUint() error: %s\n", err.Error()) + } + subId := uint32(subId64) + delete(m.subsDB, keys[0]) + delete(m.register, subId) + m.subIds = append(m.subIds, subId) + return nil +} + +func (m *Mock) RemoveAll() error { + for key := range m.subsDB { + subId64, err := strconv.ParseUint(key, 10, 64) + if err != nil { + return fmt.Errorf("RemoveAll() ParseUint() error: %s\n", err.Error()) + } + subId := uint32(subId64) + delete(m.subsDB, key) + delete(m.register, subId) + m.subIds = append(m.subIds, subId) + } + return nil +} diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 9342ccc..ec6f67a 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -22,6 +22,7 @@ package control import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + //"reflect" "sync" ) @@ -30,16 +31,21 @@ import ( // //----------------------------------------------------------------------------- type Subscription struct { - mutex sync.Mutex // Lock - valid bool // valid - registry *Registry // Registry - ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid) - Meid *xapp.RMRMeid // Meid/ RanName - EpList xapp.RmrEndpointList // Endpoints - TransLock sync.Mutex // Lock transactions, only one executed per time for subs - TheTrans TransactionIf // Ongoing transaction - SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information - SubRFMsg interface{} // Subscription information + mutex sync.Mutex // Lock + valid bool // valid + registry *Registry // Registry + ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid) + Meid *xapp.RMRMeid // Meid/ RanName + EpList xapp.RmrEndpointList // Endpoints + TransLock sync.Mutex // Lock transactions, only one executed per time for subs + TheTrans TransactionIf // Ongoing transaction + SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information + SubRFMsg interface{} // Subscription information + RetryFromXapp bool // Retry form xApp for subscription that already exist + SubRespRcvd bool // Subscription response received + DeleteFromDb bool // Delete subscription form db + NoRespToXapp bool // Send no response for subscription delete to xApp after restart + DoNotWaitSubResp bool // Test flag. Response is not waited for Subscription Request } func (s *Subscription) String() string { diff --git a/pkg/control/types.go b/pkg/control/types.go index 08bcda4..c8c09dc 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -33,3 +33,11 @@ type RequestId struct { func (rid *RequestId) String() string { return "reqid(" + rid.RequestId.String() + ")" } + +type Sdlnterface interface { + Set(pairs ...interface{}) error + Get(keys []string) (map[string]interface{}, error) + GetAll() ([]string, error) + Remove(keys []string) error + RemoveAll() error +} diff --git a/pkg/control/ut_ctrl_submgr_test.go b/pkg/control/ut_ctrl_submgr_test.go index 85cbe8f..f253424 100644 --- a/pkg/control/ut_ctrl_submgr_test.go +++ b/pkg/control/ut_ctrl_submgr_test.go @@ -20,7 +20,9 @@ package control import ( + "fmt" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststub" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "testing" "time" @@ -38,6 +40,8 @@ func createSubmgrControl(srcId teststub.RmrSrcId, rtgSvc teststub.RmrRtgSvc) *te mainCtrl = &testingSubmgrControl{} mainCtrl.RmrControl.Init("SUBMGRCTL", srcId, rtgSvc) mainCtrl.c = NewControl() + xapp.Logger.Debug("Replacing real db with test db") + mainCtrl.c.db = CreateMock() // This overrides real database for testing xapp.SetReadyCB(mainCtrl.ReadyCB, nil) go xapp.RunWithParams(mainCtrl.c, false) mainCtrl.WaitCB() @@ -45,6 +49,59 @@ func createSubmgrControl(srcId teststub.RmrSrcId, rtgSvc teststub.RmrRtgSvc) *te return mainCtrl } +func (mc *testingSubmgrControl) SimulateRestart(t *testing.T) { + mc.TestLog(t, "Simulating submgr restart") + mainCtrl.c.registry.subIds = nil + // Initialize subIds slice and subscription map + mainCtrl.c.registry.Initialize() + // Read subIds and subscriptions from database + subIds, register, err := mainCtrl.c.ReadAllSubscriptionsFromSdl() + if err != nil { + mc.TestError(t, "%v", err) + } else { + mainCtrl.c.registry.register = nil + mainCtrl.c.registry.subIds = subIds + mainCtrl.c.registry.register = register + + fmt.Println("register:") + for subId, subs := range register { + fmt.Println(" subId", subId) + fmt.Println(" subs.SubRespRcvd", subs.SubRespRcvd) + fmt.Printf(" subs %v\n", subs) + } + + fmt.Println("mainCtrl.c.registry.register:") + for subId, subs := range mainCtrl.c.registry.register { + fmt.Println(" subId", subId) + fmt.Println(" subs.SubRespRcvd", subs.SubRespRcvd) + fmt.Printf(" subs %v\n", subs) + } + } + go mainCtrl.c.HandleUncompletedSubscriptions(mainCtrl.c.registry.register) +} + +func (mc *testingSubmgrControl) SetResetTestFlag(t *testing.T, status bool) { + mc.TestLog(t, "ResetTestFlag set to %v", status) + mainCtrl.c.ResetTestFlag = status +} + +func (mc *testingSubmgrControl) removeExistingSubscriptions(t *testing.T) { + + mc.TestLog(t, "Removing existing subscriptions") + mainCtrl.c.RemoveAllSubscriptionsFromSdl() + mainCtrl.c.registry.subIds = nil + // Initialize subIds slice and subscription map + mainCtrl.c.registry.Initialize() +} + +func PringSubscriptionQueryResult(resp models.SubscriptionList) { + for _, item := range resp { + fmt.Printf("item.SubscriptionID %v\n", item.SubscriptionID) + fmt.Printf("item.Meid %v\n", item.Meid) + fmt.Printf("item.Endpoint %v\n", item.Endpoint) + } +} + func (mc *testingSubmgrControl) wait_registry_empty(t *testing.T, secs int) bool { cnt := int(0) i := 1 diff --git a/pkg/control/ut_messaging_test.go b/pkg/control/ut_messaging_test.go index 6068d66..f559f91 100644 --- a/pkg/control/ut_messaging_test.go +++ b/pkg/control/ut_messaging_test.go @@ -1721,3 +1721,248 @@ func TestSubReqRetransmissionWithSameSubIdDiffXid(t *testing.T) { e2termConn1.TestMsgChanEmpty(t) mainCtrl.wait_registry_empty(t, 10) } + +//----------------------------------------------------------------------------- +// TestSubReqNokAndSubDelOkWithRestartInMiddle +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | e2term | +// +-------+ +---------+ +---------+ +// | | | +// | SubReq | | +// |------------->| | +// | | | +// | | SubReq | +// | |------------->| +// | | | +// | | SubResp | +// | <----| +// | | +// | Submgr restart | +// | | +// | | | +// | | SubDelReq | +// | |------------->| +// | | | +// | | SubDelResp | +// | |<-------------| +// | | | +// +//----------------------------------------------------------------------------- + +func TestSubReqNokAndSubDelOkWithRestartInMiddle(t *testing.T) { + CaseBegin("TestSubReqNokAndSubDelOkWithRestartInMiddle") + + // Remove possible existing subscrition + mainCtrl.removeExistingSubscriptions(t) + + mainCtrl.SetResetTestFlag(t, true) // subs.DoNotWaitSubResp will be set TRUE for the subscription + xappConn1.SendSubsReq(t, nil, nil) + e2termConn1.RecvSubsReq(t) + mainCtrl.SetResetTestFlag(t, false) + + resp, _ := xapp.Subscription.QuerySubscriptions() + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"}) + e2SubsId := uint32(resp[0].SubscriptionID) + t.Logf("e2SubsId = %v", e2SubsId) + + mainCtrl.SimulateRestart(t) // This will trigger sending of SubDelReq + + delreq, delmsg := e2termConn1.RecvSubsDelReq(t) + e2termConn1.SendSubsDelResp(t, delreq, delmsg) + + // Wait that subs is cleaned + mainCtrl.wait_subs_clean(t, e2SubsId, 10) + + xappConn1.TestMsgChanEmpty(t) + xappConn2.TestMsgChanEmpty(t) + e2termConn1.TestMsgChanEmpty(t) + mainCtrl.wait_registry_empty(t, 10) +} + +//----------------------------------------------------------------------------- +// TestSubReqAndSubDelOkWithRestartInMiddle +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | e2term | +// +-------+ +---------+ +---------+ +// | | | +// | SubReq | | +// |------------->| | +// | | | +// | | SubReq | +// | |------------->| +// | | | +// | | SubResp | +// | |<-------------| +// | | | +// | SubResp | | +// |<-------------| | +// | | | +// | | +// | Submgr restart | +// | | +// | SubDelReq | | +// |------------->| | +// | | | +// | | SubDelReq | +// | |------------->| +// | | | +// | | SubDelResp | +// | |<-------------| +// | | | +// | SubDelResp | | +// |<-------------| | +// +//----------------------------------------------------------------------------- + +func TestSubReqAndSubDelOkWithRestartInMiddle(t *testing.T) { + CaseBegin("TestSubReqAndSubDelOkWithRestartInMiddle") + + cretrans := xappConn1.SendSubsReq(t, nil, nil) + crereq, cremsg := e2termConn1.RecvSubsReq(t) + e2termConn1.SendSubsResp(t, crereq, cremsg) + e2SubsId := xappConn1.RecvSubsResp(t, cretrans) + + // Check subscription + resp, _ := xapp.Subscription.QuerySubscriptions() + assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId)) + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"}) + + mainCtrl.SimulateRestart(t) + + // Check that subscription is restored correctly after restart + resp, _ = xapp.Subscription.QuerySubscriptions() + assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId)) + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"}) + + deltrans := xappConn1.SendSubsDelReq(t, nil, e2SubsId) + delreq, delmsg := e2termConn1.RecvSubsDelReq(t) + e2termConn1.SendSubsDelResp(t, delreq, delmsg) + xappConn1.RecvSubsDelResp(t, deltrans) + + //Wait that subs is cleaned + mainCtrl.wait_subs_clean(t, e2SubsId, 10) + + xappConn1.TestMsgChanEmpty(t) + xappConn2.TestMsgChanEmpty(t) + e2termConn1.TestMsgChanEmpty(t) + mainCtrl.wait_registry_empty(t, 10) +} + +//----------------------------------------------------------------------------- +// TestSubReqAndSubDelOkSameActionWithRestartsInMiddle +// +// stub stub +// +-------+ +-------+ +---------+ +---------+ +// | xapp2 | | xapp1 | | submgr | | e2term | +// +-------+ +-------+ +---------+ +---------+ +// | | | | +// | | | | +// | | | | +// | | SubReq1 | | +// | |------------->| | +// | | | | +// | | | SubReq1 | +// | | |------------->| +// | | | SubResp1 | +// | | |<-------------| +// | | SubResp1 | | +// | |<-------------| | +// | | | | +// | | +// | submgr restart | +// | | +// | | | | +// | | | | +// | SubReq2 | | +// |--------------------------->| | +// | | | | +// | SubResp2 | | +// |<---------------------------| | +// | | | | +// | | SubDelReq 1 | | +// | |------------->| | +// | | | | +// | | SubDelResp 1 | | +// | |<-------------| | +// | | | | +// | | | | +// | | +// | submgr restart | +// | | +// | | | | +// | SubDelReq 2 | | +// |--------------------------->| | +// | | | | +// | | | SubDelReq 2 | +// | | |------------->| +// | | | | +// | | | SubDelReq 2 | +// | | |------------->| +// | | | | +// | SubDelResp 2 | | +// |<---------------------------| | +// +//----------------------------------------------------------------------------- + +func TestSubReqAndSubDelOkSameActionWithRestartsInMiddle(t *testing.T) { + CaseBegin("TestSubReqAndSubDelOkSameActionWithRestartsInMiddle") + + //Req1 + rparams1 := &teststube2ap.E2StubSubsReqParams{} + rparams1.Init() + cretrans1 := xappConn1.SendSubsReq(t, rparams1, nil) + crereq1, cremsg1 := e2termConn1.RecvSubsReq(t) + e2termConn1.SendSubsResp(t, crereq1, cremsg1) + e2SubsId1 := xappConn1.RecvSubsResp(t, cretrans1) + + //Req2 + rparams2 := &teststube2ap.E2StubSubsReqParams{} + rparams2.Init() + cretrans2 := xappConn2.SendSubsReq(t, rparams2, nil) + e2SubsId2 := xappConn2.RecvSubsResp(t, cretrans2) + + // Check subscription + resp, _ := xapp.Subscription.QuerySubscriptions() //////////////////////////////// + assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId1)) + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560", "localhost:13660"}) + + mainCtrl.SimulateRestart(t) + + // Check that subscription is restored correctly after restart + resp, _ = xapp.Subscription.QuerySubscriptions() + assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId1)) + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560", "localhost:13660"}) + + //Del1 + deltrans1 := xappConn1.SendSubsDelReq(t, nil, e2SubsId1) + xapp.Logger.Debug("xappConn1.RecvSubsDelResp") + xappConn1.RecvSubsDelResp(t, deltrans1) + xapp.Logger.Debug("xappConn1.RecvSubsDelResp received") + + mainCtrl.SimulateRestart(t) + xapp.Logger.Debug("mainCtrl.SimulateRestart done") + + //Del2 + deltrans2 := xappConn2.SendSubsDelReq(t, nil, e2SubsId2) + delreq2, delmsg2 := e2termConn1.RecvSubsDelReq(t) + + e2termConn1.SendSubsDelResp(t, delreq2, delmsg2) + xappConn2.RecvSubsDelResp(t, deltrans2) + + //Wait that subs is cleaned + mainCtrl.wait_subs_clean(t, e2SubsId2, 10) + + xappConn1.TestMsgChanEmpty(t) + xappConn2.TestMsgChanEmpty(t) + e2termConn1.TestMsgChanEmpty(t) + mainCtrl.wait_registry_empty(t, 10) +} diff --git a/test/config-file.json b/test/config-file.json index a6cdac4..7d15902 100644 --- a/test/config-file.json +++ b/test/config-file.json @@ -31,6 +31,7 @@ "e2tSubDelReqTime_ms": 2000, "e2tRecvMsgTimeout_ms": 2000, "e2tMaxSubReqTryCount": 2, - "e2tMaxSubDelReqTryCount": 2 + "e2tMaxSubDelReqTryCount": 2, + "readSubsFromDb": "true" } } -- 2.16.6