Some cleaning and bug fixes 80/2380/4
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Thu, 30 Jan 2020 08:36:33 +0000 (10:36 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Fri, 31 Jan 2020 09:32:34 +0000 (11:32 +0200)
Change-Id: Ic7660b9ec386b152262ac8502cc1ebadca8c56b6
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
12 files changed:
pkg/control/client.go
pkg/control/control.go
pkg/control/registry.go
pkg/control/subscription.go
pkg/control/tracker.go
pkg/control/transaction.go
pkg/control/types.go
pkg/control/types_test.go
pkg/control/ut_ctrl_submgr_test.go
pkg/control/ut_messaging_test.go
pkg/control/ut_stub_e2term_test.go
pkg/control/ut_stub_xapp_test.go

index 73e2ee0..fdafcb6 100644 (file)
@@ -34,13 +34,12 @@ import (
 //
 //-----------------------------------------------------------------------------
 type SubRouteInfo struct {
-       Command Action
-       EpList  RmrEndpointList
-       SubID   uint16
+       EpList RmrEndpointList
+       SubID  uint16
 }
 
 func (sri *SubRouteInfo) String() string {
-       return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/" + sri.Command.String() + "/[" + sri.EpList.String() + "])"
+       return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/[" + sri.EpList.String() + "])"
 }
 
 //-----------------------------------------------------------------------------
@@ -50,39 +49,48 @@ type RtmgrClient struct {
        rtClient *rtmgrclient.RoutingManager
 }
 
-func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
+func (rc *RtmgrClient) SubscriptionRequestCreate(subRouteAction SubRouteInfo) error {
        subID := int32(subRouteAction.SubID)
-       xapp.Logger.Debug("%s ongoing", subRouteAction.String())
-       var err error
-       switch subRouteAction.Command {
-       case CREATE:
-               createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
-               createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-               createHandle.WithXappSubscriptionData(&createData)
-               _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle)
-       case DELETE:
-               deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
-               deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-               deleteHandle.WithXappSubscriptionData(&deleteData)
-               _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle)
-       case UPDATE:
-               var updateData rtmgr_models.XappList
-               for i := range subRouteAction.EpList.Endpoints {
-                       updateData = append(updateData, &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port})
-               }
-               updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-               updateHandle.WithSubscriptionID(subRouteAction.SubID)
-               updateHandle.WithXappList(updateData)
-               _, err = rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle)
-
-       default:
-               return fmt.Errorf("%s unknown", subRouteAction.String())
+       xapp.Logger.Debug("CREATE %s ongoing", subRouteAction.String())
+       createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
+       createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+       createHandle.WithXappSubscriptionData(&createData)
+       _, err := rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle)
+       if err != nil && !(strings.Contains(err.Error(), "status 200")) {
+               return fmt.Errorf("CREATE %s failed with error: %s", subRouteAction.String(), err.Error())
        }
+       xapp.Logger.Debug("CREATE %s successful", subRouteAction.String())
+       return nil
+}
 
+func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
+       xapp.Logger.Debug("UPDATE %s ongoing", subRouteAction.String())
+       var updateData rtmgr_models.XappList
+       for i := range subRouteAction.EpList.Endpoints {
+               updateData = append(updateData, &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port})
+       }
+       updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+       updateHandle.WithSubscriptionID(subRouteAction.SubID)
+       updateHandle.WithXappList(updateData)
+       _, err := rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle)
        if err != nil && !(strings.Contains(err.Error(), "status 200")) {
-               return fmt.Errorf("%s failed with error: %s", subRouteAction.String(), err.Error())
+               return fmt.Errorf("UPDATE %s failed with error: %s", subRouteAction.String(), err.Error())
        }
-       xapp.Logger.Debug("%s successful", subRouteAction.String())
+       xapp.Logger.Debug("UPDATE %s successful", subRouteAction.String())
        return nil
 
 }
+
+func (rc *RtmgrClient) SubscriptionRequestDelete(subRouteAction SubRouteInfo) error {
+       subID := int32(subRouteAction.SubID)
+       xapp.Logger.Debug("DELETE %s ongoing", subRouteAction.String())
+       deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
+       deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+       deleteHandle.WithXappSubscriptionData(&deleteData)
+       _, _, err := rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle)
+       if err != nil && !(strings.Contains(err.Error(), "status 200")) {
+               return fmt.Errorf("DELETE %s failed with error: %s", subRouteAction.String(), err.Error())
+       }
+       xapp.Logger.Debug("DELETE %s successful", subRouteAction.String())
+       return nil
+}
index dee7e65..1d64f3c 100755 (executable)
@@ -57,13 +57,6 @@ type RMRMeid struct {
        RanName string
 }
 
-const (
-       CREATE Action = 0
-       UPDATE Action = 1
-       NONE   Action = 2
-       DELETE Action = 3
-)
-
 func init() {
        xapp.Logger.Info("SUBMGR")
        viper.AutomaticEnv()
@@ -120,10 +113,10 @@ func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
        return
 }
 
-func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
+func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
        params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = trans.GetMtype()
-       params.SubId = int(subs.GetSubId())
+       params.SubId = int(subs.GetReqId().Seq)
        params.Xid = ""
        params.Meid = subs.GetMeid()
        params.Src = ""
@@ -131,13 +124,14 @@ func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (
        params.Payload = trans.Payload.Buf
        params.Mbuf = nil
 
-       return c.rmrSendRaw(desc, params)
+       return c.rmrSendRaw("MSG to E2T:"+desc+":"+trans.String(), params)
 }
 
-func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
+func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
+
        params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = trans.GetMtype()
-       params.SubId = int(subs.GetSubId())
+       params.SubId = int(subs.GetReqId().Seq)
        params.Xid = trans.GetXid()
        params.Meid = trans.GetMeid()
        params.Src = ""
@@ -145,7 +139,7 @@ func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Trans
        params.Payload = trans.Payload.Buf
        params.Mbuf = nil
 
-       return c.rmrSendRaw(desc, params)
+       return c.rmrSendRaw("MSG to XAPP:"+desc+":"+trans.String(), params)
 }
 
 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
@@ -153,6 +147,7 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) {
        params.Mbuf = nil
        msg := &RMRParams{params}
        c.msgCounter++
+
        switch msg.Mtype {
        case xapp.RICMessageTypes["RIC_SUB_REQ"]:
                go c.handleXAPPSubscriptionRequest(msg)
@@ -172,15 +167,12 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) {
 
        return nil
 }
-func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
+
+func idstring(err error, entries ...fmt.Stringer) string {
        var retval string = ""
        var filler string = ""
-       if trans != nil {
-               retval += filler + trans.String()
-               filler = " "
-       }
-       if subs != nil {
-               retval += filler + subs.String()
+       for _, entry := range entries {
+               retval += filler + entry.String()
                filler = " "
        }
        if err != nil {
@@ -195,24 +187,30 @@ func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
 // handle from XAPP Subscription Request
 //------------------------------------------------------------------
 func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
-       xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
+       xapp.Logger.Info("MSG from XAPP: %s", params.String())
 
        subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
        if err != nil {
-               xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
                return
        }
 
-       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
-       if err != nil {
-               xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
+       trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
+       if trans == nil {
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
                return
        }
        defer trans.Release()
 
+       err = c.tracker.Track(trans)
+       if err != nil {
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
+               return
+       }
+
        subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
        if err != nil {
-               xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
                return
        }
 
@@ -228,44 +226,50 @@ func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
                case *e2ap.E2APSubscriptionResponse:
                        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
                        if err == nil {
-                               c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
+                               c.rmrSendToXapp("", subs, trans)
                                return
                        }
                case *e2ap.E2APSubscriptionFailure:
                        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
                        if err == nil {
-                               c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
+                               c.rmrSendToXapp("", subs, trans)
                        }
-                       return
                default:
                        break
                }
        }
-       xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
+       xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
+       go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
 }
 
 //-------------------------------------------------------------------
 // handle from XAPP Subscription Delete Request
 //------------------------------------------------------------------
 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
-       xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
+       xapp.Logger.Info("MSG from XAPP: %s", params.String())
 
        subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
        if err != nil {
-               xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
                return
        }
 
-       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
-       if err != nil {
-               xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
+       trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
+       if trans == nil {
+               xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
                return
        }
        defer trans.Release()
 
-       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
+       err = c.tracker.Track(trans)
+       if err != nil {
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
+               return
+       }
+
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
        if err != nil {
-               xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
+               xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
                return
        }
 
@@ -275,89 +279,65 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
        go c.handleSubscriptionDelete(subs, trans)
        trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
 
+       xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
+
        // Whatever is received send ok delete response
        subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
-       subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
-       subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
+       subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
        subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
        if err == nil {
-               c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
+               c.rmrSendToXapp("", subs, trans)
        }
+
+       go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
 }
 
 //-------------------------------------------------------------------
 // SUBS CREATE Handling
 //-------------------------------------------------------------------
-func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
+func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
 
-       trans := c.tracker.NewTransaction(subs.GetMeid())
+       trans := c.tracker.NewSubsTransaction(subs)
        subs.WaitTransactionTurn(trans)
        defer subs.ReleaseTransactionTurn(trans)
        defer trans.Release()
 
-       xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+       xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
 
-       subs.mutex.Lock()
-       if subs.SubRespMsg != nil {
-               xapp.Logger.Debug("SUBS-SubReq: Handling (immediate resp response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
-               parentTrans.SendEvent(subs.SubRespMsg, 0)
-               subs.mutex.Unlock()
-               return
-       }
-       if subs.SubFailMsg != nil {
-               xapp.Logger.Debug("SUBS-SubReq: Handling (immediate fail response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
-               parentTrans.SendEvent(subs.SubFailMsg, 0)
-               subs.mutex.Unlock()
-               go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
-               return
-       }
-       if subs.valid == false {
-               xapp.Logger.Debug("SUBS-SubReq: Handling (immediate nil response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
-               parentTrans.SendEvent(nil, 0)
-               subs.mutex.Unlock()
-               go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
-               return
-       }
-       subs.mutex.Unlock()
-
-       event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
-       switch themsg := event.(type) {
-       case *e2ap.E2APSubscriptionResponse:
-               subs.mutex.Lock()
-               subs.SubRespMsg = themsg
-               subs.mutex.Unlock()
-               parentTrans.SendEvent(event, 0)
-               return
-       case *e2ap.E2APSubscriptionFailure:
-               subs.mutex.Lock()
-               subs.SubFailMsg = themsg
-               subs.mutex.Unlock()
-               parentTrans.SendEvent(event, 0)
-       default:
-               xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
-               subs.mutex.Lock()
-               subs.valid = false
-               subs.mutex.Unlock()
-               c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
-               parentTrans.SendEvent(nil, 0)
+       subRfMsg, valid := subs.GetCachedResponse()
+       if subRfMsg == nil && valid == true {
+               event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
+               switch event.(type) {
+               case *e2ap.E2APSubscriptionResponse:
+                       subRfMsg, valid = subs.SetCachedResponse(event, true)
+               case *e2ap.E2APSubscriptionFailure:
+                       subRfMsg, valid = subs.SetCachedResponse(event, false)
+               default:
+                       xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+                       subRfMsg, valid = subs.SetCachedResponse(nil, false)
+                       c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+               }
+               xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
+       } else {
+               xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
        }
 
-       go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+       parentTrans.SendEvent(subRfMsg, 0)
 }
 
 //-------------------------------------------------------------------
 // SUBS DELETE Handling
 //-------------------------------------------------------------------
 
-func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
+func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
 
-       trans := c.tracker.NewTransaction(subs.GetMeid())
+       trans := c.tracker.NewSubsTransaction(subs)
        subs.WaitTransactionTurn(trans)
        defer subs.ReleaseTransactionTurn(trans)
        defer trans.Release()
 
-       xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+       xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
 
        subs.mutex.Lock()
        if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
@@ -369,41 +349,37 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran
        }
 
        subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
-       subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
-       subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
+       subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
        subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
        parentTrans.SendEvent(subDelRespMsg, 0)
-
-       go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
 }
 
 //-------------------------------------------------------------------
 // send to E2T Subscription Request
 //-------------------------------------------------------------------
-func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
        var err error
        var event interface{} = nil
        var timedOut bool = false
 
        subReqMsg := subs.SubReqMsg
-       subReqMsg.RequestId.Id = 123
-       subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+       subReqMsg.RequestId = subs.GetReqId().RequestId
        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
        if err != nil {
-               xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+               xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
                return event
        }
 
        for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
-               desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
-               c.rmrSend(desc, subs, trans)
+               desc := fmt.Sprintf("(retry %d)", retries)
+               c.rmrSendToE2T(desc, subs, trans)
                event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
                if timedOut {
                        continue
                }
                break
        }
-       xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+       xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
        return event
 }
 
@@ -411,31 +387,30 @@ func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transact
 // send to E2T Subscription Delete Request
 //-------------------------------------------------------------------
 
-func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
        var err error
        var event interface{}
        var timedOut bool
 
        subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
-       subDelReqMsg.RequestId.Id = 123
-       subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+       subDelReqMsg.RequestId = subs.GetReqId().RequestId
        subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
        if err != nil {
-               xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+               xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
                return event
        }
 
        for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
-               desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
-               c.rmrSend(desc, subs, trans)
+               desc := fmt.Sprintf("(retry %d)", retries)
+               c.rmrSendToE2T(desc, subs, trans)
                event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
                if timedOut {
                        continue
                }
                break
        }
-       xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+       xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
        return event
 }
 
@@ -443,27 +418,27 @@ func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Tr
 // handle from E2T Subscription Reponse
 //-------------------------------------------------------------------
 func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
-       xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
+       xapp.Logger.Info("MSG from E2T: %s", params.String())
        subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
        if err != nil {
-               xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
                return
        }
-       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
        if err != nil {
-               xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
                return
        }
        trans := subs.GetTransaction()
        if trans == nil {
                err = fmt.Errorf("Ongoing transaction not found")
-               xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
+               xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
                return
        }
        sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
        if sendOk == false {
                err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
-               xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
+               xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
        }
        return
 }
@@ -472,27 +447,27 @@ func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
 // handle from E2T Subscription Failure
 //-------------------------------------------------------------------
 func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
-       xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
+       xapp.Logger.Info("MSG from E2T: %s", params.String())
        subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
        if err != nil {
-               xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
                return
        }
-       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
        if err != nil {
-               xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
                return
        }
        trans := subs.GetTransaction()
        if trans == nil {
                err = fmt.Errorf("Ongoing transaction not found")
-               xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
+               xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
                return
        }
        sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
        if sendOk == false {
                err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
-               xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
+               xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
        }
        return
 }
@@ -501,27 +476,27 @@ func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
 // handle from E2T Subscription Delete Response
 //-------------------------------------------------------------------
 func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
-       xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
+       xapp.Logger.Info("MSG from E2T: %s", params.String())
        subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
                return
        }
-       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
        if err != nil {
-               xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
                return
        }
        trans := subs.GetTransaction()
        if trans == nil {
                err = fmt.Errorf("Ongoing transaction not found")
-               xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
+               xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
                return
        }
        sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
        if sendOk == false {
                err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
-               xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
+               xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
        }
        return
 }
@@ -530,27 +505,27 @@ func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err er
 // handle from E2T Subscription Delete Failure
 //-------------------------------------------------------------------
 func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
-       xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
+       xapp.Logger.Info("MSG from E2T: %s", params.String())
        subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
        if err != nil {
-               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
                return
        }
-       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
        if err != nil {
-               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
                return
        }
        trans := subs.GetTransaction()
        if trans == nil {
                err = fmt.Errorf("Ongoing transaction not found")
-               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
                return
        }
        sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
        if sendOk == false {
                err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
-               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
        }
        return
 }
index 6abdcdb..e00062b 100644 (file)
@@ -30,22 +30,23 @@ import (
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
+
 type Registry struct {
        mutex       sync.Mutex
-       register    map[uint16]*Subscription
-       subIds      []uint16
+       register    map[uint32]*Subscription
+       subIds      []uint32
        rtmgrClient *RtmgrClient
 }
 
 func (r *Registry) Initialize() {
-       r.register = make(map[uint16]*Subscription)
-       var i uint16
+       r.register = make(map[uint32]*Subscription)
+       var i uint32
        for i = 0; i < 65535; i++ {
                r.subIds = append(r.subIds, i+1)
        }
 }
 
-func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
+func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
        if len(r.subIds) > 0 {
                sequenceNumber := r.subIds[0]
                r.subIds = r.subIds[1:]
@@ -55,14 +56,15 @@ func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscrip
                }
                subs := &Subscription{
                        registry:  r,
-                       Seq:       sequenceNumber,
                        Meid:      trans.Meid,
                        SubReqMsg: subReqMsg,
                        valid:     true,
                }
+               subs.ReqId.Id = 123
+               subs.ReqId.Seq = sequenceNumber
 
                if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
-                       r.subIds = append(r.subIds, subs.Seq)
+                       r.subIds = append(r.subIds, subs.ReqId.Seq)
                        return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
                }
 
@@ -71,9 +73,10 @@ func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscrip
        return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
 }
 
-func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
+func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
+
        for _, subs := range r.register {
-               if subs.IsSame(trans, subReqMsg) {
+               if subs.IsMergeable(trans, subReqMsg) {
 
                        //
                        // check if there has been race conditions
@@ -84,6 +87,11 @@ func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubs
                                subs.mutex.Unlock()
                                continue
                        }
+                       // If size is zero, entry is to be deleted
+                       if subs.EpList.Size() == 0 {
+                               subs.mutex.Unlock()
+                               continue
+                       }
                        // try to add to endpointlist.
                        if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
                                subs.mutex.Unlock()
@@ -91,15 +99,14 @@ func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubs
                        }
                        subs.mutex.Unlock()
 
-                       //Race collision during parallel incoming and deleted
-                       xapp.Logger.Debug("Registry: Identical subs found %s for %s", subs.String(), trans.String())
+                       xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
                        return subs
                }
        }
        return nil
 }
 
-func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
+func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
        var err error
        var newAlloc bool
        r.mutex.Lock()
@@ -128,31 +135,31 @@ func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2AP
        // Subscription route updates
        //
        if epamount == 1 {
-               subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
-               err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+               subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
+               err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
        } else {
-               subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
+               subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
                err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        }
        r.mutex.Lock()
 
        if err != nil {
                if newAlloc {
-                       r.subIds = append(r.subIds, subs.Seq)
+                       r.subIds = append(r.subIds, subs.ReqId.Seq)
                }
                return nil, err
        }
 
        if newAlloc {
-               r.register[subs.Seq] = subs
+               r.register[subs.ReqId.Seq] = subs
        }
-       xapp.Logger.Debug("Registry: Create %s", subs.String())
+       xapp.Logger.Debug("CREATE %s", subs.String())
        xapp.Logger.Debug("Registry: substable=%v", r.register)
        return subs, nil
 }
 
-// TODO: Needs better logic when there is concurrent calls
-func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
+// TODO: Works with concurrent calls, but check if can be improved
+func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
 
        r.mutex.Lock()
        defer r.mutex.Unlock()
@@ -161,17 +168,12 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction
 
        delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
        epamount := subs.EpList.Size()
+       seqId := subs.ReqId.Seq
 
-       //
-       // If last endpoint remove from register map
-       //
-       if epamount == 0 {
-               if _, ok := r.register[subs.Seq]; ok {
-                       xapp.Logger.Debug("Registry: Delete %s", subs.String())
-                       delete(r.register, subs.Seq)
-                       xapp.Logger.Debug("Registry: substable=%v", r.register)
-               }
+       if delStatus == false {
+               return nil
        }
+
        r.mutex.Unlock()
 
        //
@@ -183,35 +185,38 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction
                subs.mutex.Lock()
        }
 
-       xapp.Logger.Info("Registry: Cleaning %s", subs.String())
+       xapp.Logger.Info("CLEAN %s", subs.String())
 
        //
        // Subscription route updates
        //
-       if delStatus {
-               if epamount == 0 {
-                       tmpList := RmrEndpointList{}
-                       tmpList.AddEndpoint(trans.GetEndpoint())
-                       subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
-                       r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-               } else {
-                       subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
-                       r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-               }
+       if epamount == 0 {
+               tmpList := RmrEndpointList{}
+               tmpList.AddEndpoint(trans.GetEndpoint())
+               subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
+               r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
+       } else if subs.EpList.Size() > 0 {
+               subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
+               r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        }
 
        r.mutex.Lock()
        //
-       // If last endpoint free seq nro
+       // If last endpoint, release and free seqid
        //
        if epamount == 0 {
-               r.subIds = append(r.subIds, subs.Seq)
+               if _, ok := r.register[seqId]; ok {
+                       xapp.Logger.Debug("RELEASE %s", subs.String())
+                       delete(r.register, seqId)
+                       xapp.Logger.Debug("Registry: substable=%v", r.register)
+               }
+               r.subIds = append(r.subIds, seqId)
        }
 
        return nil
 }
 
-func (r *Registry) GetSubscription(sn uint16) *Subscription {
+func (r *Registry) GetSubscription(sn uint32) *Subscription {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if _, ok := r.register[sn]; ok {
@@ -220,7 +225,7 @@ func (r *Registry) GetSubscription(sn uint16) *Subscription {
        return nil
 }
 
-func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
+func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        for _, id := range ids {
index 5bfe2e1..c2b5283 100644 (file)
@@ -22,7 +22,6 @@ package control
 import (
        "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-       "strconv"
        "sync"
 )
 
@@ -30,62 +29,65 @@ import (
 //
 //-----------------------------------------------------------------------------
 type Subscription struct {
-       mutex      sync.Mutex                     // Lock
-       valid      bool                           // valid
-       registry   *Registry                      // Registry
-       Seq        uint16                         // SubsId
-       Meid       *xapp.RMRMeid                  // Meid/ RanName
-       EpList     RmrEndpointList                // Endpoints
-       TransLock  sync.Mutex                     // Lock transactions, only one executed per time for subs
-       TheTrans   *Transaction                   // Ongoing transaction from xapp
-       SubReqMsg  *e2ap.E2APSubscriptionRequest  // Subscription information
-       SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information
-       SubFailMsg *e2ap.E2APSubscriptionFailure  // Subscription information
+       mutex     sync.Mutex                    // Lock
+       valid     bool                          // valid
+       registry  *Registry                     // Registry
+       ReqId     RequestId                     // ReqId (Requestor Id + Seq Nro a.k.a subsid)
+       Meid      *xapp.RMRMeid                 // Meid/ RanName
+       EpList    RmrEndpointList               // Endpoints
+       TransLock sync.Mutex                    // Lock transactions, only one executed per time for subs
+       TheTrans  TransactionIf                 // Ongoing transaction
+       SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information
+       SubRFMsg  interface{}                   // Subscription information
 }
 
 func (s *Subscription) String() string {
-       return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
+       return "subs(" + s.ReqId.String() + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
 }
 
-func (s *Subscription) GetSubId() uint16 {
+func (s *Subscription) GetCachedResponse() (interface{}, bool) {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       return s.Seq
+       return s.SubRFMsg, s.valid
 }
 
-func (s *Subscription) GetMeid() *xapp.RMRMeid {
+func (s *Subscription) SetCachedResponse(subRFMsg interface{}, valid bool) (interface{}, bool) {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       if s.Meid != nil {
-               return s.Meid
-       }
-       return nil
+       s.SubRFMsg = subRFMsg
+       s.valid = valid
+       return s.SubRFMsg, s.valid
 }
 
-func (s *Subscription) IsTransactionReserved() bool {
+func (s *Subscription) GetReqId() *RequestId {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       if s.TheTrans != nil {
-               return true
-       }
-       return false
+       return &s.ReqId
+}
 
+func (s *Subscription) GetMeid() *xapp.RMRMeid {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       if s.Meid != nil {
+               return s.Meid
+       }
+       return nil
 }
 
-func (s *Subscription) GetTransaction() *Transaction {
+func (s *Subscription) GetTransaction() TransactionIf {
        s.mutex.Lock()
        defer s.mutex.Unlock()
        return s.TheTrans
 }
 
-func (s *Subscription) WaitTransactionTurn(trans *Transaction) {
+func (s *Subscription) WaitTransactionTurn(trans TransactionIf) {
        s.TransLock.Lock()
        s.mutex.Lock()
        s.TheTrans = trans
        s.mutex.Unlock()
 }
 
-func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) {
+func (s *Subscription) ReleaseTransactionTurn(trans TransactionIf) {
        s.mutex.Lock()
        if trans != nil && trans == s.TheTrans {
                s.TheTrans = nil
@@ -94,7 +96,7 @@ func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) {
        s.TransLock.Unlock()
 }
 
-func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) bool {
+func (s *Subscription) IsMergeable(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) bool {
        s.mutex.Lock()
        defer s.mutex.Unlock()
 
@@ -110,15 +112,6 @@ func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscripti
                return false
        }
 
-       if s.EpList.Size() == 0 {
-               return false
-       }
-
-       //Somehow special case ... ?
-       if s.EpList.HasEndpoint(trans.GetEndpoint()) == true {
-               return false
-       }
-
        // EventTrigger check
        if s.SubReqMsg.EventTriggerDefinition.InterfaceDirection != subReqMsg.EventTriggerDefinition.InterfaceDirection ||
                s.SubReqMsg.EventTriggerDefinition.ProcedureCode != subReqMsg.EventTriggerDefinition.ProcedureCode ||
@@ -156,6 +149,10 @@ func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscripti
                                return false
                        }
 
+                       if acts.ActionType != e2ap.E2AP_ActionTypeReport {
+                               return false
+                       }
+
                        if acts.ActionDefinition.Present != actt.ActionDefinition.Present ||
                                acts.ActionDefinition.StyleId != actt.ActionDefinition.StyleId ||
                                acts.ActionDefinition.ParamId != actt.ActionDefinition.ParamId {
index c16a76a..2f54237 100644 (file)
@@ -30,71 +30,81 @@ import (
 //-----------------------------------------------------------------------------
 type Tracker struct {
        mutex                sync.Mutex
-       transactionXappTable map[TransactionXappKey]*Transaction
+       transactionXappTable map[TransactionXappKey]*TransactionXapp
        transSeq             uint64
 }
 
 func (t *Tracker) Init() {
-       t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
+       t.transactionXappTable = make(map[TransactionXappKey]*TransactionXapp)
 }
 
-func (t *Tracker) NewTransactionFromSkel(transSkel *Transaction) *Transaction {
+func (t *Tracker) initTransaction(transBase *Transaction) {
        t.mutex.Lock()
        defer t.mutex.Unlock()
-       trans := transSkel
-       if trans == nil {
-               trans = &Transaction{}
-       }
-       trans.EventChan = make(chan interface{})
-       trans.tracker = t
-       trans.Seq = t.transSeq
+       transBase.EventChan = make(chan interface{})
+       transBase.tracker = t
+       transBase.Seq = t.transSeq
        t.transSeq++
-       xapp.Logger.Debug("Transaction: Create %s", trans.String())
-       return trans
 }
 
-func (t *Tracker) NewTransaction(meid *xapp.RMRMeid) *Transaction {
-       trans := &Transaction{}
-       trans.Meid = meid
-       trans = t.NewTransactionFromSkel(trans)
+func (t *Tracker) NewSubsTransaction(subs *Subscription) *TransactionSubs {
+       trans := &TransactionSubs{}
+       trans.Meid = subs.GetMeid()
+       rid := subs.GetReqId()
+       if rid != nil {
+               trans.ReqId = *rid
+       }
+       t.initTransaction(&trans.Transaction)
+       xapp.Logger.Debug("CREATE %s", trans.String())
        return trans
 }
 
-func (t *Tracker) TrackTransaction(
+func (t *Tracker) NewXappTransaction(
        endpoint *RmrEndpoint,
        xid string,
-       meid *xapp.RMRMeid) (*Transaction, error) {
-
-       if endpoint == nil {
-               err := fmt.Errorf("Tracker: No valid endpoint given")
-               return nil, err
-       }
+       reqId *RequestId,
+       meid *xapp.RMRMeid) *TransactionXapp {
 
-       trans := &Transaction{}
+       trans := &TransactionXapp{}
        trans.XappKey = &TransactionXappKey{*endpoint, xid}
        trans.Meid = meid
-       trans = t.NewTransactionFromSkel(trans)
+       if reqId != nil {
+               trans.ReqId = *reqId
+       }
+       t.initTransaction(&trans.Transaction)
+       xapp.Logger.Debug("CREATE %s", trans.String())
+       return trans
+}
+
+func (t *Tracker) Track(trans *TransactionXapp) error {
+
+       if trans.GetEndpoint() == nil {
+               err := fmt.Errorf("Tracker: No valid endpoint given in %s", trans.String())
+               return err
+       }
 
        t.mutex.Lock()
        defer t.mutex.Unlock()
 
-       if othtrans, ok := t.transactionXappTable[*trans.XappKey]; ok {
-               err := fmt.Errorf("Tracker: %s is ongoing, %s not created ", othtrans, trans)
-               return nil, err
+       theKey := *trans.XappKey
+
+       if othtrans, ok := t.transactionXappTable[theKey]; ok {
+               err := fmt.Errorf("Tracker: %s is ongoing, not tracking %s", othtrans, trans)
+               return err
        }
 
        trans.tracker = t
-       t.transactionXappTable[*trans.XappKey] = trans
-       xapp.Logger.Debug("Tracker: Add %s", trans.String())
+       t.transactionXappTable[theKey] = trans
+       xapp.Logger.Debug("Tracker: Append %s", trans.String())
        //xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
-       return trans, nil
+       return nil
 }
 
-func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*Transaction, error) {
+func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*TransactionXapp, error) {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if trans, ok2 := t.transactionXappTable[xappKey]; ok2 {
-               xapp.Logger.Debug("Tracker: Delete %s", trans.String())
+               xapp.Logger.Debug("Tracker: Remove %s", trans.String())
                delete(t.transactionXappTable, xappKey)
                //xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
                return trans, nil
index 735954e..b2b838b 100644 (file)
@@ -30,18 +30,33 @@ import (
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
+type TransactionIf interface {
+       String() string
+       Release()
+       SendEvent(interface{}, time.Duration) (bool, bool)
+       WaitEvent(time.Duration) (interface{}, bool)
+}
 
-type TransactionBase struct {
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+type Transaction struct {
        mutex     sync.Mutex         //
-       Seq       uint64             //
+       Seq       uint64             //transaction sequence
        tracker   *Tracker           //tracker instance
        Meid      *xapp.RMRMeid      //meid transaction related
+       ReqId     RequestId          //
        Mtype     int                //Encoded message type to be send
        Payload   *packer.PackedData //Encoded message to be send
        EventChan chan interface{}
 }
 
-func (t *TransactionBase) SendEvent(event interface{}, waittime time.Duration) (bool, bool) {
+func (t *Transaction) String() string {
+       return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + t.ReqId.String() + ")"
+}
+
+func (t *Transaction) SendEvent(event interface{}, waittime time.Duration) (bool, bool) {
        if waittime > 0 {
                select {
                case t.EventChan <- event:
@@ -55,7 +70,7 @@ func (t *TransactionBase) SendEvent(event interface{}, waittime time.Duration) (
        return true, false
 }
 
-func (t *TransactionBase) WaitEvent(waittime time.Duration) (interface{}, bool) {
+func (t *Transaction) WaitEvent(waittime time.Duration) (interface{}, bool) {
        if waittime > 0 {
                select {
                case event := <-t.EventChan:
@@ -68,13 +83,19 @@ func (t *TransactionBase) WaitEvent(waittime time.Duration) (interface{}, bool)
        return event, false
 }
 
-func (t *TransactionBase) GetMtype() int {
+func (t *Transaction) GetReqId() *RequestId {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return &t.ReqId
+}
+
+func (t *Transaction) GetMtype() int {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        return t.Mtype
 }
 
-func (t *TransactionBase) GetMeid() *xapp.RMRMeid {
+func (t *Transaction) GetMeid() *xapp.RMRMeid {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if t.Meid != nil {
@@ -83,12 +104,30 @@ func (t *TransactionBase) GetMeid() *xapp.RMRMeid {
        return nil
 }
 
-func (t *TransactionBase) GetPayload() *packer.PackedData {
+func (t *Transaction) GetPayload() *packer.PackedData {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        return t.Payload
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type TransactionSubs struct {
+       Transaction //
+}
+
+func (t *TransactionSubs) String() string {
+       return "transsubs(" + t.Transaction.String() + ")"
+}
+
+func (t *TransactionSubs) Release() {
+       t.mutex.Lock()
+       xapp.Logger.Debug("RELEASE %s", t.String())
+       t.tracker = nil
+       t.mutex.Unlock()
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
@@ -104,20 +143,20 @@ func (key *TransactionXappKey) String() string {
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type Transaction struct {
-       TransactionBase                     //
-       XappKey         *TransactionXappKey //
+type TransactionXapp struct {
+       Transaction                     //
+       XappKey     *TransactionXappKey //
 }
 
-func (t *Transaction) String() string {
+func (t *TransactionXapp) String() string {
        var transkey string = "transkey(N/A)"
        if t.XappKey != nil {
                transkey = t.XappKey.String()
        }
-       return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + transkey + ")"
+       return "transxapp(" + t.Transaction.String() + "/" + transkey + ")"
 }
 
-func (t *Transaction) GetEndpoint() *RmrEndpoint {
+func (t *TransactionXapp) GetEndpoint() *RmrEndpoint {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if t.XappKey != nil {
@@ -126,7 +165,7 @@ func (t *Transaction) GetEndpoint() *RmrEndpoint {
        return nil
 }
 
-func (t *Transaction) GetXid() string {
+func (t *TransactionXapp) GetXid() string {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if t.XappKey != nil {
@@ -135,7 +174,7 @@ func (t *Transaction) GetXid() string {
        return ""
 }
 
-func (t *Transaction) GetSrc() string {
+func (t *TransactionXapp) GetSrc() string {
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if t.XappKey != nil {
@@ -144,9 +183,9 @@ func (t *Transaction) GetSrc() string {
        return ""
 }
 
-func (t *Transaction) Release() {
+func (t *TransactionXapp) Release() {
        t.mutex.Lock()
-       xapp.Logger.Debug("Transaction: Release %s", t.String())
+       xapp.Logger.Debug("RELEASE %s", t.String())
        tracker := t.tracker
        xappkey := t.XappKey
        t.tracker = nil
index 164e801..4d318e0 100644 (file)
@@ -22,6 +22,7 @@ package control
 import (
        "bytes"
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "strconv"
        "strings"
@@ -30,10 +31,12 @@ import (
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type RmrDatagram struct {
-       MessageType    int
-       SubscriptionId uint16
-       Payload        []byte
+type RequestId struct {
+       e2ap.RequestId
+}
+
+func (rid *RequestId) String() string {
+       return "reqid(" + rid.RequestId.String() + ")"
 }
 
 //-----------------------------------------------------------------------------
@@ -45,7 +48,15 @@ type RmrEndpoint struct {
 }
 
 func (endpoint RmrEndpoint) String() string {
-       return endpoint.Get()
+       return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
+}
+
+func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool {
+       if (endpoint.Addr == ep.Addr) &&
+               (endpoint.Port == ep.Port) {
+               return true
+       }
+       return false
 }
 
 func (endpoint *RmrEndpoint) GetAddr() string {
@@ -56,10 +67,6 @@ func (endpoint *RmrEndpoint) GetPort() uint16 {
        return endpoint.Port
 }
 
-func (endpoint *RmrEndpoint) Get() string {
-       return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
-}
-
 func (endpoint *RmrEndpoint) Set(src string) bool {
        elems := strings.Split(src, ":")
        if len(elems) == 2 {
@@ -82,11 +89,10 @@ type RmrEndpointList struct {
 }
 
 func (eplist *RmrEndpointList) String() string {
+       tmpList := eplist.Endpoints
        valuesText := []string{}
-       for i := range eplist.Endpoints {
-               ep := eplist.Endpoints[i]
-               text := ep.String()
-               valuesText = append(valuesText, text)
+       for i := range tmpList {
+               valuesText = append(valuesText, tmpList[i].String())
        }
        return strings.Join(valuesText, ",")
 }
@@ -97,7 +103,7 @@ func (eplist *RmrEndpointList) Size() int {
 
 func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
        for i := range eplist.Endpoints {
-               if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+               if eplist.Endpoints[i].Equal(ep) {
                        return false
                }
        }
@@ -107,7 +113,7 @@ func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
 
 func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
        for i := range eplist.Endpoints {
-               if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+               if eplist.Endpoints[i].Equal(ep) {
                        eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1]
                        eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0}
                        eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1]
@@ -120,7 +126,7 @@ func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
 func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
        var retval bool = false
        for i := range otheplist.Endpoints {
-               if eplist.DelEndpoint(&eplist.Endpoints[i]) {
+               if eplist.DelEndpoint(&otheplist.Endpoints[i]) {
                        retval = true
                }
        }
@@ -129,7 +135,7 @@ func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
 
 func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
        for i := range eplist.Endpoints {
-               if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+               if eplist.Endpoints[i].Equal(ep) {
                        return true
                }
        }
@@ -144,25 +150,6 @@ func NewRmrEndpoint(src string) *RmrEndpoint {
        return ep
 }
 
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type Action int
-
-func (act Action) String() string {
-       actions := [...]string{
-               "CREATE",
-               "UPDATE",
-               "NONE",
-               "DELETE",
-       }
-
-       if act < CREATE || act > DELETE {
-               return "UNKNOWN"
-       }
-       return actions[act]
-}
-
 //-----------------------------------------------------------------------------
 // To add own method for rmrparams
 //-----------------------------------------------------------------------------
index caeae7b..5e1cfac 100644 (file)
@@ -39,7 +39,7 @@ func TestRmrEndpoint(t *testing.T) {
                        testError(t, "Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort())
                }
                if expect.String() != res.String() {
-                       testError(t, "Endpoint string for value %s expected %s got %s", val, expect.String(), res.Get())
+                       testError(t, "Endpoint string for value %s expected %s got %s", val, expect.String(), res.String())
                }
 
        }
@@ -52,20 +52,71 @@ func TestRmrEndpoint(t *testing.T) {
        testEp(t, "", nil)
 }
 
-func TestAction(t *testing.T) {
+func TestRmrEndpointList(t *testing.T) {
+       epl := &RmrEndpointList{}
 
-       testActionString := func(t *testing.T, val int, str string) {
-               if Action(val).String() != str {
-                       testError(t, "String for value %d expected %s got %s", val, str, Action(val).String())
-               }
+       // Simple add / has / delete
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               testError(t, "RmrEndpointList: 8080 add failed")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true {
+               testError(t, "RmrEndpointList: 8080 duplicate add success")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               testError(t, "RmrEndpointList: 8081 add failed")
+       }
+       if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               testError(t, "RmrEndpointList: 8081 has failed")
+       }
+       if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               testError(t, "RmrEndpointList: 8081 del failed")
+       }
+       if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+               testError(t, "RmrEndpointList: 8081 has non existing success")
+       }
+       if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+               testError(t, "RmrEndpointList: 8081 del non existing success")
+       }
+       if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               testError(t, "RmrEndpointList: 8080 del failed")
+       }
+
+       // list delete
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               testError(t, "RmrEndpointList: 8080 add failed")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               testError(t, "RmrEndpointList: 8081 add failed")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+               testError(t, "RmrEndpointList: 8082 add failed")
+       }
+
+       epl2 := &RmrEndpointList{}
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false {
+               testError(t, "RmrEndpointList: othlist add 9080 failed")
+       }
+
+       if epl.DelEndpoints(epl2) == true {
+               testError(t, "RmrEndpointList: delete list not existing successs")
+       }
+
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               testError(t, "RmrEndpointList: othlist add 8080 failed")
+       }
+       if epl.DelEndpoints(epl2) == false {
+               testError(t, "RmrEndpointList: delete list 8080,9080 failed")
+       }
+
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               testError(t, "RmrEndpointList: othlist add 8081 failed")
+       }
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+               testError(t, "RmrEndpointList: othlist add 8082 failed")
+       }
+
+       if epl.DelEndpoints(epl2) == false {
+               testError(t, "RmrEndpointList: delete list 8080,8081,8082,9080 failed")
        }
 
-       testActionString(t, 0, "CREATE")
-       testActionString(t, 1, "UPDATE")
-       testActionString(t, 2, "NONE")
-       testActionString(t, 3, "DELETE")
-       testActionString(t, 5, "UNKNOWN")
-       testActionString(t, 6, "UNKNOWN")
-       testActionString(t, 7, "UNKNOWN")
-       testActionString(t, 10, "UNKNOWN")
 }
index 8403c93..16da422 100644 (file)
@@ -57,11 +57,11 @@ func (mc *testingSubmgrControl) wait_registry_empty(t *testing.T, secs int) bool
        return false
 }
 
-func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId int, secs int) bool {
+func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId uint32, secs int) bool {
        var subs *Subscription
        i := 1
        for ; i <= secs*2; i++ {
-               subs = mc.c.registry.GetSubscription(uint16(e2SubsId))
+               subs = mc.c.registry.GetSubscription(e2SubsId)
                if subs == nil {
                        return true
                }
@@ -75,11 +75,11 @@ func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId int, secs
        return false
 }
 
-func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId int, secs int) bool {
-       var trans *Transaction
+func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId uint32, secs int) bool {
+       var trans TransactionIf
        i := 1
        for ; i <= secs*2; i++ {
-               subs := mc.c.registry.GetSubscription(uint16(e2SubsId))
+               subs := mc.c.registry.GetSubscription(e2SubsId)
                if subs == nil {
                        return true
                }
@@ -97,13 +97,13 @@ func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId int
        return false
 }
 
-func (mc *testingSubmgrControl) get_subid(t *testing.T) uint16 {
+func (mc *testingSubmgrControl) get_subid(t *testing.T) uint32 {
        mc.c.registry.mutex.Lock()
        defer mc.c.registry.mutex.Unlock()
        return mc.c.registry.subIds[0]
 }
 
-func (mc *testingSubmgrControl) wait_subid_change(t *testing.T, origSubId uint16, secs int) (uint16, bool) {
+func (mc *testingSubmgrControl) wait_subid_change(t *testing.T, origSubId uint32, secs int) (uint32, bool) {
        i := 1
        for ; i <= secs*2; i++ {
                mc.c.registry.mutex.Lock()
index 40207b3..2e8bc0e 100644 (file)
@@ -56,7 +56,7 @@ func TestSubReqAndRouteNok(t *testing.T) {
        waiter.WaitResult(t)
 
        //Wait that subs is cleaned
-       mainCtrl.wait_subs_clean(t, int(newSubsId), 10)
+       mainCtrl.wait_subs_clean(t, newSubsId, 10)
 
        xappConn1.TestMsgCnt(t)
        xappConn2.TestMsgCnt(t)
@@ -585,7 +585,7 @@ func TestSubReqRetryNoRespSubDelRespInSubmgr(t *testing.T) {
        e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
 
        // Wait that subs is cleaned
-       mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 10)
+       mainCtrl.wait_subs_clean(t, delreq.RequestId.Seq, 10)
 
        xappConn1.TestMsgCnt(t)
        xappConn2.TestMsgCnt(t)
@@ -642,7 +642,7 @@ func TestSubReqTwoRetriesNoRespAtAllInSubmgr(t *testing.T) {
        delreq, _ := e2termConn.handle_e2term_subs_del_req(t)
 
        // Wait that subs is cleaned
-       mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 15)
+       mainCtrl.wait_subs_clean(t, delreq.RequestId.Seq, 15)
 
        xappConn1.TestMsgCnt(t)
        xappConn2.TestMsgCnt(t)
@@ -1156,7 +1156,7 @@ func TestSubReqAndSubDelNoAnswerSameActionParallel(t *testing.T) {
        e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
 
        //Wait that subs is cleaned
-       mainCtrl.wait_subs_clean(t, int(delreq1.RequestId.Seq), 10)
+       mainCtrl.wait_subs_clean(t, delreq1.RequestId.Seq, 10)
 
        xappConn1.TestMsgCnt(t)
        xappConn2.TestMsgCnt(t)
index 18a6264..273d159 100644 (file)
@@ -132,7 +132,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_resp(t *testing.T, req *
        }
 
        e2SubsResp.Set(resp)
-       xapp.Logger.Debug("%s", e2SubsResp.String())
+       xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsResp.String())
        packerr, packedMsg := e2SubsResp.Pack(nil)
        if packerr != nil {
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
@@ -198,7 +198,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_fail(t *testing.T, fpara
        xapp.Logger.Info("(%s) Send Subs Fail", e2termConn.desc)
 
        e2SubsFail.Set(fparams.fail)
-       xapp.Logger.Debug("%s", e2SubsFail.String())
+       xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsFail.String())
        packerr, packedMsg := e2SubsFail.Pack(nil)
        if packerr != nil {
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
@@ -276,7 +276,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_del_resp(t *testing.T, r
        resp.FunctionId = req.FunctionId
 
        e2SubsDelResp.Set(resp)
-       xapp.Logger.Debug("%s", e2SubsDelResp.String())
+       xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsDelResp.String())
        packerr, packedMsg := e2SubsDelResp.Pack(nil)
        if packerr != nil {
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
@@ -316,7 +316,7 @@ func (e2termConn *testingE2termStub) handle_e2term_subs_del_fail(t *testing.T, r
        resp.Cause.CauseVal = 4 // unspecified
 
        e2SubsDelFail.Set(resp)
-       xapp.Logger.Debug("%s", e2SubsDelFail.String())
+       xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsDelFail.String())
        packerr, packedMsg := e2SubsDelFail.Pack(nil)
        if packerr != nil {
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
index 331d14d..ac72b76 100644 (file)
@@ -194,10 +194,10 @@ func (xappConn *testingXappStub) handle_xapp_subs_req(t *testing.T, rparams *tes
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int {
+func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) uint32 {
        xapp.Logger.Info("(%s) handle_xapp_subs_resp", xappConn.desc)
        e2SubsResp := xapp_e2asnpacker.NewPackerSubscriptionResponse()
-       var e2SubsId int
+       var e2SubsId uint32
 
        //---------------------------------
        // xapp activity: Recv Subs Resp
@@ -207,14 +207,18 @@ func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xapp
                xappConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
                        testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
-                       return -1
+                       return 0
                } else if msg.Xid != trans.xid {
                        testError(t, "(%s) Received RIC_SUB_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
-                       return -1
+                       return 0
                } else {
                        packedData := &packer.PackedData{}
                        packedData.Buf = msg.Payload
-                       e2SubsId = msg.SubId
+                       if msg.SubId > 0 {
+                               e2SubsId = uint32(msg.SubId)
+                       } else {
+                               e2SubsId = 0
+                       }
                        unpackerr := e2SubsResp.UnPack(packedData)
 
                        if unpackerr != nil {
@@ -230,18 +234,18 @@ func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xapp
                }
        case <-time.After(15 * time.Second):
                testError(t, "(%s) Not Received RIC_SUB_RESP within 15 secs", xappConn.desc)
-               return -1
+               return 0
        }
-       return -1
+       return 0
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) int {
+func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) uint32 {
        xapp.Logger.Info("(%s) handle_xapp_subs_fail", xappConn.desc)
        e2SubsFail := xapp_e2asnpacker.NewPackerSubscriptionFailure()
-       var e2SubsId int
+       var e2SubsId uint32
 
        //-------------------------------
        // xapp activity: Recv Subs Fail
@@ -251,14 +255,18 @@ func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xapp
                xappConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_FAILURE"] {
                        testError(t, "(%s) Received RIC_SUB_FAILURE wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_FAILURE", xapp.RicMessageTypeToName[msg.Mtype])
-                       return -1
+                       return 0
                } else if msg.Xid != trans.xid {
                        testError(t, "(%s) Received RIC_SUB_FAILURE wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
-                       return -1
+                       return 0
                } else {
                        packedData := &packer.PackedData{}
                        packedData.Buf = msg.Payload
-                       e2SubsId = msg.SubId
+                       if msg.SubId > 0 {
+                               e2SubsId = uint32(msg.SubId)
+                       } else {
+                               e2SubsId = 0
+                       }
                        unpackerr := e2SubsFail.UnPack(packedData)
 
                        if unpackerr != nil {
@@ -274,15 +282,15 @@ func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xapp
                }
        case <-time.After(15 * time.Second):
                testError(t, "(%s) Not Received RIC_SUB_FAILURE within 15 secs", xappConn.desc)
-               return -1
+               return 0
        }
-       return -1
+       return 0
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction {
+func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId uint32) *xappTransaction {
        xapp.Logger.Info("(%s) handle_xapp_subs_del_req", xappConn.desc)
        e2SubsDelReq := xapp_e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
@@ -293,7 +301,7 @@ func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans
 
        req := &e2ap.E2APSubscriptionDeleteRequest{}
        req.RequestId.Id = 1
-       req.RequestId.Seq = uint32(e2SubsId)
+       req.RequestId.Seq = e2SubsId
        req.FunctionId = 1
 
        e2SubsDelReq.Set(req)
@@ -311,7 +319,7 @@ func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans
 
        params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_DEL_REQ
-       params.SubId = e2SubsId
+       params.SubId = int(e2SubsId)
        params.Payload = packedMsg.Buf
        params.Meid = trans.meid
        params.Xid = trans.xid