X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=7d180ad6d914d7303bacb313b3d01a61d1eb0a7d;hb=c92b421ec9f89e77df36422987e478ed8db85299;hp=9f90252322d6cbd7deef6bf105f4e8b9f0ba069d;hpb=fca5c3af85c440f6456dcb90f854033fb91349c1;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 9f90252..7d180ad 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -21,14 +21,20 @@ package control import ( "fmt" + "net/http" + "os" + "strconv" + "strings" + "time" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" + "github.com/gorilla/mux" "github.com/spf13/viper" - "time" ) //----------------------------------------------------------------------------- @@ -59,14 +65,17 @@ var e2tSubDelReqTime time.Duration var e2tRecvMsgTimeout time.Duration var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry +var readSubsFromDb string type Control struct { *xapp.RMRClient e2ap *E2ap registry *Registry tracker *Tracker + db Sdlnterface //subscriber *xapp.Subscriber - CntRecvMsg uint64 + CntRecvMsg uint64 + ResetTestFlag bool } type RMRMeid struct { @@ -75,6 +84,9 @@ type RMRMeid struct { RanName string } +type SubmgrRestartTestEvent struct{} +type SubmgrRestartUpEvent struct{} + func init() { xapp.Logger.Info("SUBMGR") viper.AutomaticEnv() @@ -84,6 +96,54 @@ func init() { func NewControl() *Control { + ReadConfigParameters() + transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) + rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)} + + registry := new(Registry) + registry.Initialize() + registry.rtmgrClient = &rtmgrClient + + tracker := new(Tracker) + tracker.Init() + + //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) + + c := &Control{e2ap: new(E2ap), + registry: registry, + tracker: tracker, + db: CreateSdl(), + //subscriber: subscriber, + } + + // Register REST handler for testing support + xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST") + + go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler) + //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) + + if readSubsFromDb == "false" { + return c + } + + // Read subscriptions from db + xapp.Logger.Info("Reading subscriptions from db") + subIds, register, err := c.ReadAllSubscriptionsFromSdl() + if err != nil { + xapp.Logger.Error("%v", err) + } else { + c.registry.subIds = subIds + c.registry.register = register + c.HandleUncompletedSubscriptions(register) + } + return c +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func ReadConfigParameters() { + // viper.GetDuration returns nanoseconds e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000 if e2tSubReqTimeout == 0 { @@ -111,26 +171,26 @@ func NewControl() *Control { } xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount) - transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) - rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)} - - registry := new(Registry) - registry.Initialize() - registry.rtmgrClient = &rtmgrClient - - tracker := new(Tracker) - tracker.Init() - - //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) + readSubsFromDb = viper.GetString("controls.readSubsFromDb") + if readSubsFromDb == "" { + readSubsFromDb = "true" + } + xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb) +} - c := &Control{e2ap: new(E2ap), - registry: registry, - tracker: tracker, - //subscriber: subscriber, +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) { + + xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register)) + for subId, subs := range register { + if subs.SubRespRcvd == false { + subs.NoRespToXapp = true + xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId) + c.SendSubscriptionDeleteReq(subs) + } } - go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler) - //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) - return c } func (c *Control) ReadyCB(data interface{}) { @@ -164,14 +224,47 @@ func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params inte return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented") } -func (c *Control) SubscriptionDeleteHandler(string) error { - return fmt.Errorf("Subscription rest interface not implemented") +func (c *Control) SubscriptionDeleteHandler(s string) error { + return nil } func (c *Control) QueryHandler() (models.SubscriptionList, error) { return c.registry.QueryHandler() } +func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) { + + xapp.Logger.Info("TestRestHandler() called") + + pathParams := mux.Vars(r) + s := pathParams["testId"] + + // This can be used to delete single subscription from db + if contains := strings.Contains(s, "deletesubid="); contains == true { + var splits = strings.Split(s, "=") + if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil { + xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId) + c.RemoveSubscriptionFromSdl(uint32(subId)) + return + } + } + + // This can be used to remove all subscriptions db from + if s == "emptydb" { + xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called") + c.RemoveAllSubscriptionsFromSdl() + return + } + + // This is meant to cause submgr's restart in testing + if s == "restart" { + xapp.Logger.Info("os.Exit(1) called") + os.Exit(1) + } + + xapp.Logger.Info("Unsupported rest command received %s", s) +} + //------------------------------------------------------------------- // //------------------------------------------------------------------- @@ -270,7 +363,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { } //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it? - subs, err := c.registry.AssignToSubscription(trans, subReqMsg) + subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag) if err != nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) return @@ -281,7 +374,6 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { // go c.handleSubscriptionCreate(subs, trans) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side - err = nil if event != nil { switch themsg := event.(type) { @@ -344,7 +436,12 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) - // Whatever is received send ok delete response + if subs.NoRespToXapp == true { + // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions + return + } + + // Whatever is received success, fail or timeout, send successful delete response subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{} subDelRespMsg.RequestId = subs.GetReqId().RequestId subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId @@ -362,6 +459,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { //------------------------------------------------------------------- func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) { + var removeSubscriptionFromDb bool = false trans := c.tracker.NewSubsTransaction(subs) subs.WaitTransactionTurn(trans) defer subs.ReleaseTransactionTurn(trans) @@ -386,7 +484,9 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: subRfMsg, valid = subs.SetCachedResponse(event, true) + subs.SubRespRcvd = true case *e2ap.E2APSubscriptionFailure: + removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(event, false) doRetry = true for _, item := range themsg.ActionNotAdmittedList.Items { @@ -397,13 +497,17 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran } xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans)) c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) + case *SubmgrRestartTestEvent: + // This simulates that no response has been received and after restart subscriptions are restored from db + xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case") + return default: xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) + removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(nil, false) c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) } } - xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } else { xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) @@ -411,8 +515,10 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) if valid == false { - c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) + c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c) } + + c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb) parentTrans.SendEvent(subRfMsg, 0) } @@ -440,7 +546,8 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...)) - c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) + c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c) + c.registry.UpdateSubscriptionToDb(subs, c) parentTrans.SendEvent(nil, 0) } @@ -460,12 +567,19 @@ func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transact return event } + // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart + c.WriteSubscriptionToDb(subs) for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ { desc := fmt.Sprintf("(retry %d)", retries) c.rmrSendToE2T(desc, subs, trans) - event, timedOut = trans.WaitEvent(e2tSubReqTimeout) - if timedOut { - continue + if subs.DoNotWaitSubResp == false { + event, timedOut = trans.WaitEvent(e2tSubReqTimeout) + if timedOut { + continue + } + } else { + // Simulating case where subscrition request has been sent but response has not been received before restart + event = &SubmgrRestartTestEvent{} } break } @@ -644,3 +758,76 @@ func typeofSubsMessage(v interface{}) string { return "Unknown" } } + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) WriteSubscriptionToDb(subs *Subscription) { + xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId) + err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs) + if err != nil { + xapp.Logger.Error("%v", err) + } +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) { + + if removeSubscriptionFromDb == true { + // Subscription was written in db already when subscription request was sent to BTS, except for merged request + c.RemoveSubscriptionFromDb(subs) + } else { + // Update is needed for successful response and merge case here + if subs.RetryFromXapp == false { + c.WriteSubscriptionToDb(subs) + } + } + subs.RetryFromXapp = false +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) { + xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId) + err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId) + if err != nil { + xapp.Logger.Error("%v", err) + } +} + +func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { + + xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId) + + // Send delete for every endpoint in the subscription + subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{} + subDelReqMsg.RequestId = subs.GetReqId().RequestId + subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId + mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg) + if err != nil { + xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err)) + return + } + for _, endPoint := range subs.EpList.Endpoints { + params := &xapp.RMRParams{} + params.Mtype = mType + params.SubId = int(subs.GetReqId().InstanceId) + params.Xid = "" + params.Meid = subs.Meid + params.Src = endPoint.String() + params.PayloadLen = len(payload.Buf) + params.Payload = payload.Buf + params.Mbuf = nil + + if params == nil { + xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil") + return + } + + subs.DeleteFromDb = true + c.handleXAPPSubscriptionDeleteRequest(params) + } +}