submgr 0.10.3 published
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 /*
23 #include <rmr/RIC_message_types.h>
24 #include <rmr/rmr.h>
25
26 #cgo CFLAGS: -I../
27 #cgo LDFLAGS: -lrmr_nng -lnng
28 */
29 import "C"
30
31 import (
32         "errors"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         "github.com/spf13/viper"
35         "github.com/go-openapi/strfmt"
36         httptransport "github.com/go-openapi/runtime/client"
37         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
39         "math/rand"
40         "strconv"
41         "time"
42 )
43
44 type Control struct {
45         e2ap        *E2ap
46         registry    *Registry
47         rtmgrClient *RtmgrClient
48         tracker     *Tracker
49         rc_chan     chan *xapp.RMRParams
50 }
51
52 type RMRMeid struct {
53         PlmnID string
54         EnbID  string
55 }
56
57 var SEEDSN uint16
58 var SubscriptionReqChan = make(chan subRouteInfo, 10)
59
60 const (
61         CREATE Action = 0
62         MERGE Action = 1
63         DELETE Action = 3
64 )
65
66 func init() {
67         viper.AutomaticEnv()
68         viper.SetEnvPrefix("submgr")
69         viper.AllowEmptyEnv(true)
70         SEEDSN = uint16(viper.GetInt("seed_sn"))
71         if SEEDSN == 0 {
72                 rand.Seed(time.Now().UnixNano())
73                 SEEDSN = uint16(rand.Intn(65535))
74         }
75         if SEEDSN > 65535 {
76                 SEEDSN = 0
77         }
78         xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
79 }
80
81 func NewControl() Control {
82         registry := new(Registry)
83         registry.Initialize(SEEDSN)
84
85         tracker := new(Tracker)
86         tracker.Init()
87
88         transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89         client := rtmgrclient.New(transport, strfmt.Default)
90         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91         delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
92         rtmgrClient := RtmgrClient{client, handle, delete_handle}
93
94         return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
95 }
96
97 func (c *Control) Run() {
98         go c.controlLoop()
99         xapp.Run(c)
100 }
101
102 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
103         c.rc_chan <- rp
104         return
105 }
106
107 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
108         if !xapp.Rmr.Send(params, false) {
109                 err = errors.New("rmr.Send() failed")
110         }
111         return
112 }
113
114 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
115         if !xapp.Rmr.Send(params, true) {
116                 err = errors.New("rmr.Send() failed")
117         }
118         return
119 }
120
121 func (c *Control) controlLoop() {
122         for {
123                 msg := <-c.rc_chan
124                 switch msg.Mtype {
125                         case C.RIC_SUB_REQ:
126                                 c.handleSubscriptionRequest(msg)
127                         case C.RIC_SUB_RESP:
128                                 c.handleSubscriptionResponse(msg)
129                         case C.RIC_SUB_DEL_REQ:
130                                 c.handleSubscriptionDeleteRequest(msg)
131                         case C.RIC_SUB_DEL_RESP:
132                                 c.handleSubscriptionDeleteResponse(msg)
133                         default:
134                                 err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
135                                 xapp.Logger.Error("Unknown message type: %v", err)
136                 }
137         }
138 }
139
140 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
141         payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
142         if err != nil {
143                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
144                 return
145         }
146         xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
147
148         /* Reserve a sequence number and set it in the payload */
149         new_sub_id := c.registry.ReserveSequenceNumber()
150
151         _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
152         if err != nil {
153                 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
154                 return
155         }
156
157         src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
158         if err != nil {
159                 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
160                 return
161         }
162
163         /* Create transatcion records for every subscription request */
164         xact_key := Transaction_key{new_sub_id, CREATE}
165         xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
166         err = c.tracker.Track_transaction(xact_key, xact_value)
167         if err != nil {
168                 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
169                 return
170         }
171
172         /* Update routing manager about the new subscription*/
173         sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
174         go c.rtmgrClient.SubscriptionRequestUpdate()
175         SubscriptionReqChan <- sub_route_action
176
177         // Setting new subscription ID in the RMR header
178         params.SubId = int(new_sub_id)
179
180         xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
181         c.rmrSend(params)
182         xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
183         return
184 }
185
186 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
187         payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
188         if err != nil {
189                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
190                 return
191         }
192         xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
193         if !c.registry.IsValidSequenceNumber(payload_seq_num) {
194                 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
195                 return
196         }
197         c.registry.setSubscriptionToConfirmed(payload_seq_num)
198         xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
199         transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
200         if err != nil {
201                 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
202                 return
203         }
204         xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
205         params.Mbuf = transaction.Mbuf
206         c.rmrReplyToSender(params)
207         return
208 }
209
210 func (act Action) String() string {
211         actions := [...]string{
212                 "CREATE",
213                 "MERGE",
214                 "DELETE",
215         }
216
217         if act < CREATE || act > DELETE {
218                 return "Unknown"
219         }
220         return actions[act]
221 }
222
223 func (act Action) valid() bool {
224         switch act {
225         case CREATE, MERGE, DELETE:
226                 return true
227         default:
228                 return false
229         }
230 }
231
232 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
233         payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
234         if err != nil {
235                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
236                 return
237         }
238         xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
239         if c.registry.IsValidSequenceNumber(payload_seq_num) {
240                 c.registry.deleteSubscription(payload_seq_num)
241                 trackErr := c.trackDeleteTransaction(params, payload_seq_num)
242                 if trackErr != nil {
243                         xapp.Logger.Error("Failed to create a transaction record due to %v", err)
244                         return trackErr
245                 }
246         }
247         xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
248         c.rmrSend(params)
249         return
250 }
251
252 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
253         src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
254         xact_key := Transaction_key{payload_seq_num, DELETE}
255         xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
256         err = c.tracker.Track_transaction(xact_key, xact_value)
257         return
258 }
259
260 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
261         payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
262         if err != nil {
263                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
264                 return
265         }
266     var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
267         sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
268         go c.rtmgrClient.SubscriptionRequestUpdate()
269         SubscriptionReqChan <- sub_route_action
270
271         xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
272         if c.registry.releaseSequenceNumber(payload_seq_num) {
273                 transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
274                 if err != nil {
275                         xapp.Logger.Error("Failed to create a transaction record due to %v", err)
276                         return
277                 }
278                 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
279                 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
280                 params.Mbuf = transaction.Mbuf
281                 c.rmrReplyToSender(params)
282         }
283         return
284 }