X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=b398daaf1488c882dc5422ca8ba9c34eb3d64595;hb=c65f5b0d9ad73ce3f66eabc22e80244546e97238;hp=34dd6a0c710a1199d28f805b8b7693cdf2c47e94;hpb=f1d0eb6a82e11f14f60e3636d526299ced0173ea;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 34dd6a0..b398daa 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -30,25 +30,25 @@ import ( "github.com/go-openapi/strfmt" "github.com/spf13/viper" "math/rand" + "sync" "time" - "sync" ) var subReqTime time.Duration = 5 * time.Second var SubDelReqTime time.Duration = 5 * time.Second type Control struct { - e2ap *E2ap - registry *Registry - rtmgrClient *RtmgrClient - tracker *Tracker - timerMap *TimerMap - rmrSendMutex *sync.Mutex + e2ap *E2ap + registry *Registry + rtmgrClient *RtmgrClient + tracker *Tracker + timerMap *TimerMap + rmrSendMutex sync.Mutex } type RMRMeid struct { - PlmnID string - EnbID string + PlmnID string + EnbID string RanName string } @@ -77,7 +77,7 @@ func init() { xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN) } -func NewControl() Control { +func NewControl() *Control { registry := new(Registry) registry.Initialize(seedSN) @@ -87,15 +87,18 @@ func NewControl() Control { timerMap := new(TimerMap) timerMap.Init() - rmrSendMutex := &sync.Mutex{} - transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) client := rtmgrclient.New(transport, strfmt.Default) handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) rtmgrClient := RtmgrClient{client, handle, deleteHandle} - return Control{new(E2ap), registry, &rtmgrClient, tracker, timerMap, rmrSendMutex} + return &Control{e2ap: new(E2ap), + registry: registry, + rtmgrClient: &rtmgrClient, + tracker: tracker, + timerMap: timerMap, + } } func (c *Control) Run() { @@ -105,12 +108,12 @@ func (c *Control) Run() { func (c *Control) rmrSend(params *xapp.RMRParams) (err error) { status := false i := 1 - for ; i <= 10 && status == false; i++ { + for ; i <= 10 && status == false; i++ { c.rmrSendMutex.Lock() status = xapp.Rmr.Send(params, false) c.rmrSendMutex.Unlock() if status == false { - xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid) + xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid) time.Sleep(500 * time.Millisecond) } } @@ -147,15 +150,15 @@ func (c *Control) Consume(msg *xapp.RMRParams) (err error) { } func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { - xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) + xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil /* Reserve a sequence number and set it in the payload */ newSubId, isIdValid := c.registry.ReserveSequenceNumber() if isIdValid != true { - xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s",params.SubId, params.Xid) - return + xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) + return } err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) @@ -182,6 +185,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { /* Update routing manager about the new subscription*/ subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId} xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) + err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this SubReq msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -190,7 +194,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { // Setting new subscription ID in the RMR header params.SubId = int(newSubId) - xapp.Logger.Info("Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid) + xapp.Logger.Info("Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid) err = c.rmrSend(params) if err != nil { xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -202,7 +206,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { } func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { - xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) + xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil @@ -211,10 +215,10 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } - xapp.Logger.Info("SubResp: Received payloadSeqNum: %v",payloadSeqNum) + xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum) if !c.registry.IsValidSequenceNumber(payloadSeqNum) { - xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId) + xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId) return } @@ -224,15 +228,15 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE) if err != nil { - xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %V, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - - xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v",params.Mtype, params.SubId, params.Meid) + params.SubId = int(payloadSeqNum) + params.Xid = transaction.OrigParams.Xid + + xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid) err = c.rmrReplyToSender(params) if err != nil { xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -248,7 +252,7 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { } func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { - xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) + xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil @@ -263,8 +267,8 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE) - if err != nil { - xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v: %s", err, params.SubId) + if err != nil { + xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) @@ -272,7 +276,7 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { params.SubId = int(payloadSeqNum) params.Xid = transaction.OrigParams.Xid - xapp.Logger.Info("Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) + xapp.Logger.Info("Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) err = c.rmrReplyToSender(params) if err != nil { xapp.Logger.Error("Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -304,9 +308,9 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int) { newSubId := uint16(nbrId) xapp.Logger.Info("SubReq timer expired. newSubId: %v", newSubId) -// var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something -// var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something -// c.sendSubscriptionFailure(newSubId, causeContent, causeVal) + // var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something + // var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something + // c.sendSubscriptionFailure(newSubId, causeContent, causeVal) } /* @@ -324,13 +328,13 @@ func (c *Control) sendSubscriptionFailure(subId uint16, causeContent uint8, caus params.SubId = int(subId) params.Meid = transaction.OrigParams.Meid params.Xid = transaction.OrigParams.Xid - + // newPayload, packErr := c.e2ap.PackSubscriptionFailure(transaction.OrigParams.Payload, subId, causeContent, causeVal) // if packErr != nil { // xapp.Logger.Error("SendSubFail: PackSubscriptionFailure() due to %v", packErr) // return // } - + newPayload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0") // Temporary solution params.PayloadLen = len(newPayload) @@ -392,7 +396,7 @@ func (act Action) valid() bool { } func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { - xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) + xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil @@ -414,8 +418,8 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) return } - - xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) + + xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) c.rmrSend(params) if err != nil { xapp.Logger.Error("SubDelReq: Failed to send request to E2T. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -437,13 +441,13 @@ func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum u } func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) { - xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) + xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum) @@ -452,19 +456,19 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE) - if err != nil { + if err != nil { xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) + params.SubId = int(payloadSeqNum) + params.Xid = transaction.OrigParams.Xid + xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) err = c.rmrReplyToSender(params) if err != nil { xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) -// return + // return } time.Sleep(3 * time.Second) @@ -492,13 +496,13 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err } func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { - xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) + xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum) @@ -507,19 +511,19 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE) - if err != nil { + if err != nil { xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId) return } xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - xapp.Logger.Info("Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) + params.SubId = int(payloadSeqNum) + params.Xid = transaction.OrigParams.Xid + xapp.Logger.Info("Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) err = c.rmrReplyToSender(params) if err != nil { xapp.Logger.Error("Failed to send SubDelFail to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) -// return + // return } time.Sleep(3 * time.Second) @@ -540,7 +544,7 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { return } } else { - xapp.Logger.Error("SubDelFail: Failed to release sequency number. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } return @@ -549,9 +553,9 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int) { newSubId := uint16(nbrId) xapp.Logger.Info("SubDelReq timer expired. newSubId: %v", newSubId) -// var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something -// var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something -// c.sendSubscriptionDeleteFailure(newSubId, causeContent, causeVal) + // var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something + // var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something + // c.sendSubscriptionDeleteFailure(newSubId, causeContent, causeVal) } /* @@ -568,7 +572,7 @@ func (c *Control) sendSubscriptionDeleteFailure(subId uint16, causeContent uint8 params.SubId = int(subId) params.Meid = transaction.OrigParams.Meid params.Xid = transaction.OrigParams.Xid - + // newPayload, packErr := c.e2ap.PackSubscriptionDeleteFailure(transaction.OrigParams.Payload, subId, causeContent, causeVal) // if packErr != nil { // xapp.Logger.Error("SendSubDelFail: PackSubscriptionDeleteFailure(). Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)) @@ -585,9 +589,9 @@ func (c *Control) sendSubscriptionDeleteFailure(subId uint16, causeContent uint8 if err != nil { xapp.Logger.Error("SendSubDelFail: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) } - + time.Sleep(3 * time.Second) - + xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort) xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) @@ -610,4 +614,4 @@ func (c *Control) sendSubscriptionDeleteFailure(subId uint16, causeContent uint8 } return } -*/ \ No newline at end of file +*/