dc8c5e0493c0adb708a6d76411c1d386223cf24a
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import "C"
23
24 import (
25         "errors"
26         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
27         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
28         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29         httptransport "github.com/go-openapi/runtime/client"
30         "github.com/go-openapi/strfmt"
31         "github.com/spf13/viper"
32         "math/rand"
33         "strconv"
34         "time"
35 )
36
37 type Control struct {
38         e2ap        *E2ap
39         registry    *Registry
40         rtmgrClient *RtmgrClient
41         tracker     *Tracker
42         rcChan      chan *xapp.RMRParams
43 }
44
45 type RMRMeid struct {
46         PlmnID string
47         EnbID  string
48 }
49
50 var seedSN uint16
51 var SubscriptionReqChan = make(chan SubRouteInfo, 10)
52
53 const (
54         CREATE Action = 0
55         MERGE  Action = 1
56         DELETE Action = 3
57 )
58
59 func init() {
60         viper.AutomaticEnv()
61         viper.SetEnvPrefix("submgr")
62         viper.AllowEmptyEnv(true)
63         seedSN = uint16(viper.GetInt("seed_sn"))
64         if seedSN == 0 {
65                 rand.Seed(time.Now().UnixNano())
66                 seedSN = uint16(rand.Intn(65535))
67         }
68         if seedSN > 65535 {
69                 seedSN = 0
70         }
71         xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
72 }
73
74 func NewControl() Control {
75         registry := new(Registry)
76         registry.Initialize(seedSN)
77
78         tracker := new(Tracker)
79         tracker.Init()
80
81         transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
82         client := rtmgrclient.New(transport, strfmt.Default)
83         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
84         deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
85         rtmgrClient := RtmgrClient{client, handle, deleteHandle}
86
87         return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
88 }
89
90 func (c *Control) Run() {
91         go c.controlLoop()
92         xapp.Run(c)
93 }
94
95 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
96         c.rcChan <- rp
97         return
98 }
99
100 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
101         if !xapp.Rmr.Send(params, false) {
102                 err = errors.New("rmr.Send() failed")
103         }
104         return
105 }
106
107 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
108         if !xapp.Rmr.Send(params, true) {
109                 err = errors.New("rmr.Send() failed")
110         }
111         return
112 }
113
114 func (c *Control) controlLoop() {
115         for {
116                 msg := <-c.rcChan
117                 switch msg.Mtype {
118                 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
119                         c.handleSubscriptionRequest(msg)
120                 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
121                         c.handleSubscriptionResponse(msg)
122                 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
123                         c.handleSubscriptionDeleteRequest(msg)
124                 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
125                         c.handleSubscriptionDeleteResponse(msg)
126                 default:
127                         err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
128                         xapp.Logger.Error("Unknown message type: %v", err)
129                 }
130         }
131 }
132
133 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
134         payloadSeqNum, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
135         if err != nil {
136                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
137                 return
138         }
139         xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
140
141         /* Reserve a sequence number and set it in the payload */
142         newSubId := c.registry.ReserveSequenceNumber()
143
144         _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
145         if err != nil {
146                 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
147                 return
148         }
149
150         srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
151         if err != nil {
152                 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
153                 return
154         }
155
156         /* Create transatcion records for every subscription request */
157         xactKey := TransactionKey{newSubId, CREATE}
158         xactValue := Transaction{*srcAddr, *srcPort, params}
159         err = c.tracker.TrackTransaction(xactKey, xactValue)
160         if err != nil {
161                 xapp.Logger.Error("Failed to create a Subscription Request transaction record due to %v", err)
162                 return
163         }
164
165         /* Update routing manager about the new subscription*/
166         subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
167         go c.rtmgrClient.SubscriptionRequestUpdate()
168         SubscriptionReqChan <- subRouteAction
169
170         // Setting new subscription ID in the RMR header
171         params.SubId = int(newSubId)
172
173         xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(newSubId))
174         c.rmrSend(params)
175         xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
176         return
177 }
178
179 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
180         payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
181         if err != nil {
182                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
183                 return
184         }
185         xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
186         if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
187                 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payloadSeqNum)) + " in Subscritpion Response. Message discarded.")
188                 return
189         }
190         c.registry.setSubscriptionToConfirmed(payloadSeqNum)
191         xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
192         transaction, err := c.tracker.completeTransaction(payloadSeqNum, CREATE)
193         if err != nil {
194                 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
195                 return
196         }
197         xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
198         params.Mbuf = transaction.OrigParams.Mbuf
199         c.rmrReplyToSender(params)
200         return
201 }
202
203 func (act Action) String() string {
204         actions := [...]string{
205                 "CREATE",
206                 "MERGE",
207                 "DELETE",
208         }
209
210         if act < CREATE || act > DELETE {
211                 return "Unknown"
212         }
213         return actions[act]
214 }
215
216 func (act Action) valid() bool {
217         switch act {
218         case CREATE, MERGE, DELETE:
219                 return true
220         default:
221                 return false
222         }
223 }
224
225 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
226         payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
227         if err != nil {
228                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
229                 return
230         }
231         xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
232         if c.registry.IsValidSequenceNumber(payloadSeqNum) {
233                 c.registry.deleteSubscription(payloadSeqNum)
234                 trackErr := c.trackDeleteTransaction(params, payloadSeqNum)
235                 if trackErr != nil {
236                         xapp.Logger.Error("Failed to create a Subscription Delete Request transaction record due to %v", trackErr)
237                         return trackErr
238                 }
239         }
240         xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payloadSeqNum))
241         c.rmrSend(params)
242         return
243 }
244
245 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
246         srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
247         if err != nil {
248                 xapp.Logger.Error("Failed to update routing-manager about the subscription delete request with reason: %s", err)
249         }
250         xactKey := TransactionKey{payloadSeqNum, DELETE}
251         xactValue := Transaction{*srcAddr, *srcPort, params}
252         err = c.tracker.TrackTransaction(xactKey, xactValue)
253         return
254 }
255
256 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
257         payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
258         if err != nil {
259                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
260                 return
261         }
262         var transaction, _ = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
263         subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
264         go c.rtmgrClient.SubscriptionRequestUpdate()
265         SubscriptionReqChan <- subRouteAction
266
267         xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
268         if c.registry.releaseSequenceNumber(payloadSeqNum) {
269                 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
270                 if err != nil {
271                         xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
272                         return
273                 }
274                 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
275                 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
276                 params.Mbuf = transaction.OrigParams.Mbuf
277                 c.rmrReplyToSender(params)
278         }
279         return
280 }