X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=de632167592b654deb197b05148e9c4a271e7896;hb=refs%2Fchanges%2F38%2F7438%2F1;hp=08000b0505e1c802b5d1b84e15a6aebf8fb7d63d;hpb=e1af87f0759201f64804eff5911c4e6c0e859be9;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 08000b0..de63216 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -25,6 +25,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" @@ -201,7 +202,7 @@ func (c *Control) ReadE2Subscriptions() error { } else { c.registry.subIds = subIds c.registry.register = register - c.HandleUncompletedSubscriptions(register) + go c.HandleUncompletedSubscriptions(register) return nil } } @@ -213,6 +214,8 @@ func (c *Control) ReadE2Subscriptions() error { // //------------------------------------------------------------------- func (c *Control) ReadRESTSubscriptions() error { + + xapp.Logger.Debug("ReadRESTSubscriptions()") var err error var restSubscriptions map[string]*RESTSubscription for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ { @@ -222,6 +225,12 @@ func (c *Control) ReadRESTSubscriptions() error { xapp.Logger.Error("%v", err) <-time.After(1 * time.Second) } else { + // Fix REST subscriptions ongoing status after restart + for restSubId, restSubscription := range restSubscriptions { + restSubscription.SubReqOngoing = false + restSubscription.SubDelReqOngoing = false + c.WriteRESTSubscriptionToSdl(restSubId, restSubscription) + } c.registry.restSubscriptions = restSubscriptions return nil } @@ -238,7 +247,7 @@ func (c *Control) ReadConfigParameters(f string) { xapp.Logger.Debug("ReadConfigParameters") c.LoggerLevel = int(xapp.Logger.GetLevel()) - xapp.Logger.Debug("LoggerLevel= %v", c.LoggerLevel) + xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel) c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel) // viper.GetDuration returns nanoseconds @@ -461,6 +470,7 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript if duplicate { err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum) xapp.Logger.Debug("%s", err) + c.registry.DeleteRESTSubscription(&restSubId) c.UpdateCounter(cRestSubRespToXapp) return &subResp, common.SubscribeCreatedCode } @@ -469,6 +479,7 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p) if err != nil { xapp.Logger.Error("%s", err) + c.registry.DeleteRESTSubscription(&restSubId) return nil, common.SubscribeBadRequestCode } go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives) @@ -520,7 +531,7 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) { c.SubscriptionProcessingStartDelay() - xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests)) + xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests)) var xAppEventInstanceID int64 var e2EventInstanceID int64 @@ -549,6 +560,10 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription trans.Release() if err != nil { + if err.Error() == "TEST: restart event received" { + // This is just for UT cases. Stop here subscription processing + return + } c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo) } else { e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId) @@ -599,7 +614,7 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e // Wake subs request // subs.OngoingReqCount++ - go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives) + go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side subs.OngoingReqCount-- @@ -615,24 +630,27 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e c.RemoveSubscriptionFromDb(subs) err = fmt.Errorf("E2 interface down") errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") - return nil, &errorInfo, err } case *e2ap.E2APSubscriptionFailure: err = fmt.Errorf("E2 SubscriptionFailure received") errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") - return nil, &errorInfo, err case *PackSubscriptionRequestErrortEvent: err = fmt.Errorf("E2 SubscriptionRequest pack failure") - return nil, &themsg.ErrorInfo, err + errorInfo = themsg.ErrorInfo case *SDLWriteErrortEvent: err = fmt.Errorf("SDL write failure") - return nil, &themsg.ErrorInfo, err + errorInfo = themsg.ErrorInfo + case *SubmgrRestartTestEvent: + err = fmt.Errorf("TEST: restart event received") + xapp.Logger.Debug("%s", err) + return nil, &errorInfo, err default: err = fmt.Errorf("Unexpected E2 subscription response received") errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") break } } else { + // Timer expiry err = fmt.Errorf("E2 subscription response timeout") errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout) if subs.PolicyUpdate == true { @@ -681,6 +699,7 @@ func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSu c.UpdateCounter(cRestSubFailNotifToXapp) xapp.Subscription.Notify(resp, *clientEndpoint) + // E2 is down. Delete completely processed request safely now if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false { c.registry.DeleteRESTSubscription(restSubId) c.RemoveRESTSubscriptionFromDb(*restSubId) @@ -715,6 +734,7 @@ func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubs c.UpdateCounter(cRestSubNotifToXapp) xapp.Subscription.Notify(resp, *clientEndpoint) + // E2 is down. Delete completely processed request safely now if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false { c.registry.DeleteRESTSubscription(restSubId) c.RemoveRESTSubscriptionFromDb(*restSubId) @@ -746,8 +766,8 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { return common.UnsubscribeBadRequestCode } else if restSubscription.SubDelReqOngoing == true { // Previous request for same restSubId still ongoing - c.UpdateCounter(cRestSubDelFailToXapp) - return common.UnsubscribeBadRequestCode + c.UpdateCounter(cRestSubDelRespToXapp) + return common.UnsubscribeNoContentCode } } } @@ -756,7 +776,7 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { go func() { xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds) for _, instanceId := range restSubscription.InstanceIds { - xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId) + xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0) if err != nil { xapp.Logger.Error("%s", err.Error()) @@ -777,7 +797,7 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { //------------------------------------------------------------------- // //------------------------------------------------------------------- -func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) { +func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) { var xAppEventInstanceID int64 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId}) @@ -805,7 +825,7 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, // Wake subs delete // subs.OngoingDelCount++ - go c.handleSubscriptionDelete(subs, trans) + go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side subs.OngoingDelCount-- @@ -987,7 +1007,7 @@ func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *Transaction e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil) subs.OngoingReqCount++ - go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives) + go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side subs.OngoingReqCount-- var err error @@ -1058,7 +1078,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { // Wake subs delete // subs.OngoingDelCount++ - go c.handleSubscriptionDelete(subs, trans) + go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side subs.OngoingDelCount-- @@ -1088,7 +1108,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { //------------------------------------------------------------------- // SUBS CREATE Handling //------------------------------------------------------------------- -func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) { +func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) { var event interface{} = nil var removeSubscriptionFromDb bool = false @@ -1107,36 +1127,43 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran subRfMsg, valid = subs.SetCachedResponse(event, true) subs.SubRespRcvd = true case *e2ap.E2APSubscriptionFailure: - removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(event, false) xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) case *SubmgrRestartTestEvent: - // This simulates that no response has been received and after restart subscriptions are restored from db + // This is used to simulate that no response has been received and after restart, subscriptions are restored from db xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case") + subRfMsg, valid = subs.SetCachedResponse(event, false) + parentTrans.SendEvent(subRfMsg, 0) + return case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent: subRfMsg, valid = subs.SetCachedResponse(event, false) default: + // Timer expiry if subs.PolicyUpdate == false { xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) - removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(nil, false) c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) + } else { + subRfMsg, valid = subs.SetCachedResponse(nil, true) } } xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } else { xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } + if valid == false { + removeSubscriptionFromDb = true + } err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb) if err != nil { - subRfMsg, valid = subs.SetCachedResponse(event, false) + valid = false c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) } //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) if valid == false { - c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c) + c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) } parentTrans.SendEvent(subRfMsg, 0) @@ -1146,7 +1173,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran // SUBS DELETE Handling //------------------------------------------------------------------- -func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) { +func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) { trans := c.tracker.NewSubsTransaction(subs) subs.WaitTransactionTurn(trans) @@ -1167,8 +1194,7 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...)) - c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c) - c.registry.UpdateSubscriptionToDb(subs, c) + c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) parentTrans.SendEvent(nil, 0) } @@ -1186,7 +1212,7 @@ func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transact subReqMsg.RequestId.Id = ricRequestorId trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg) if err != nil { - xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans)) + xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans)) return &PackSubscriptionRequestErrortEvent{ ErrorInfo{ ErrorSource: models.SubscriptionInstanceErrorSourceASN1, @@ -1293,6 +1319,7 @@ func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs)) return } + xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans) sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout) if sendOk == false { err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut) @@ -1496,6 +1523,11 @@ func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) { func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { + if c.UTTesting == true { + // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests + c.registry.mutex = new(sync.Mutex) + } + const ricRequestorId = 123 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)