2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
27 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
40 rtmgrClient *RtmgrClient
42 rcChan chan *xapp.RMRParams
51 var SubscriptionReqChan = make(chan SubRouteInfo, 10)
61 viper.SetEnvPrefix("submgr")
62 viper.AllowEmptyEnv(true)
63 seedSN = uint16(viper.GetInt("seed_sn"))
65 rand.Seed(time.Now().UnixNano())
66 seedSN = uint16(rand.Intn(65535))
71 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
74 func NewControl() Control {
75 registry := new(Registry)
76 registry.Initialize(seedSN)
78 tracker := new(Tracker)
81 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
82 client := rtmgrclient.New(transport, strfmt.Default)
83 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
84 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
85 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
87 return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
90 func (c *Control) Run() {
95 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
100 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
101 if !xapp.Rmr.Send(params, false) {
102 err = errors.New("rmr.Send() failed")
107 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
108 if !xapp.Rmr.Send(params, true) {
109 err = errors.New("rmr.Send() failed")
114 func (c *Control) controlLoop() {
118 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
119 c.handleSubscriptionRequest(msg)
120 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
121 c.handleSubscriptionResponse(msg)
122 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
123 c.handleSubscriptionDeleteRequest(msg)
124 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
125 c.handleSubscriptionDeleteResponse(msg)
127 err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
128 xapp.Logger.Error("Unknown message type: %v", err)
133 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
134 payloadSeqNum, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
136 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
139 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
141 /* Reserve a sequence number and set it in the payload */
142 newSubId := c.registry.ReserveSequenceNumber()
144 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
146 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
150 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
152 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
156 /* Create transatcion records for every subscription request */
157 xactKey := TransactionKey{newSubId, CREATE}
158 xactValue := Transaction{*srcAddr, *srcPort, params}
159 err = c.tracker.TrackTransaction(xactKey, xactValue)
161 xapp.Logger.Error("Failed to create a Subscription Request transaction record due to %v", err)
165 /* Update routing manager about the new subscription*/
166 subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
167 go c.rtmgrClient.SubscriptionRequestUpdate()
168 SubscriptionReqChan <- subRouteAction
170 // Setting new subscription ID in the RMR header
171 params.SubId = int(newSubId)
173 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(newSubId))
175 xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
179 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
180 payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
182 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
185 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
186 if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
187 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payloadSeqNum)) + " in Subscritpion Response. Message discarded.")
190 c.registry.setSubscriptionToConfirmed(payloadSeqNum)
191 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
192 transaction, err := c.tracker.completeTransaction(payloadSeqNum, CREATE)
194 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
197 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
198 params.Mbuf = transaction.OrigParams.Mbuf
199 c.rmrReplyToSender(params)
203 func (act Action) String() string {
204 actions := [...]string{
210 if act < CREATE || act > DELETE {
216 func (act Action) valid() bool {
218 case CREATE, MERGE, DELETE:
225 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
226 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
228 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
231 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
232 if c.registry.IsValidSequenceNumber(payloadSeqNum) {
233 c.registry.deleteSubscription(payloadSeqNum)
234 trackErr := c.trackDeleteTransaction(params, payloadSeqNum)
236 xapp.Logger.Error("Failed to create a Subscription Delete Request transaction record due to %v", trackErr)
240 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payloadSeqNum))
245 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
246 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
248 xapp.Logger.Error("Failed to update routing-manager about the subscription delete request with reason: %s", err)
250 xactKey := TransactionKey{payloadSeqNum, DELETE}
251 xactValue := Transaction{*srcAddr, *srcPort, params}
252 err = c.tracker.TrackTransaction(xactKey, xactValue)
256 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
257 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
259 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
262 var transaction, _ = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
263 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
264 go c.rtmgrClient.SubscriptionRequestUpdate()
265 SubscriptionReqChan <- subRouteAction
267 xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
268 if c.registry.releaseSequenceNumber(payloadSeqNum) {
269 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
271 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
274 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
275 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
276 params.Mbuf = transaction.OrigParams.Mbuf
277 c.rmrReplyToSender(params)