ec6419e3794bfc149c199bfe50424a4c9a1fbbe8
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 /*
23 #include <rmr/RIC_message_types.h>
24 #include <rmr/rmr.h>
25
26 #cgo CFLAGS: -I../
27 #cgo LDFLAGS: -lrmr_nng -lnng
28 */
29 import "C"
30
31 import (
32         "errors"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         "github.com/spf13/viper"
35         "github.com/go-openapi/strfmt"
36         httptransport "github.com/go-openapi/runtime/client"
37         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
39         "math/rand"
40         "strconv"
41         "time"
42 )
43
44 type Control struct {
45         e2ap        *E2ap
46         registry    *Registry
47         rtmgrClient *RtmgrClient
48         tracker     *Tracker
49 }
50
51 type RMRMeid struct {
52         PlmnID string
53         EnbID  string
54 }
55
56 var SEEDSN uint16
57 var SubscriptionReqChan = make(chan subRouteInfo, 10)
58
59 const (
60         CREATE Action = 0
61         MERGE Action = 1
62         DELETE Action = 3
63 )
64
65 func init() {
66         viper.AutomaticEnv()
67         viper.SetEnvPrefix("submgr")
68         viper.AllowEmptyEnv(true)
69         SEEDSN = uint16(viper.GetInt("seed_sn"))
70         if SEEDSN == 0 {
71                 rand.Seed(time.Now().UnixNano())
72                 SEEDSN = uint16(rand.Intn(65535))
73         }
74         if SEEDSN > 65535 {
75                 SEEDSN = 0
76         }
77         xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
78 }
79
80 func NewControl() Control {
81         registry := new(Registry)
82         registry.Initialize(SEEDSN)
83
84         tracker := new(Tracker)
85         tracker.Init()
86
87         transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88         client := rtmgrclient.New(transport, strfmt.Default)
89         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
90         delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91         rtmgrClient := RtmgrClient{client, handle, delete_handle}
92
93         return Control{new(E2ap), registry, &rtmgrClient, tracker}
94 }
95
96 func (c *Control) Run() {
97         xapp.Run(c)
98 }
99
100 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
101         switch rp.Mtype {
102         case C.RIC_SUB_REQ:
103                 err = c.handleSubscriptionRequest(rp)
104         case C.RIC_SUB_RESP:
105                 err = c.handleSubscriptionResponse(rp)
106         case C.RIC_SUB_DEL_REQ:
107                 err = c.handleSubscriptionDeleteRequest(rp)
108         case C.RIC_SUB_DEL_RESP:
109                 err = c.handleSubscriptionDeleteResponse(rp)
110         default:
111                 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
112         }
113         return
114 }
115
116 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
117         if !xapp.Rmr.Send(params, false) {
118                 err = errors.New("rmr.Send() failed")
119         }
120         return
121 }
122
123 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
124         if !xapp.Rmr.Send(params, true) {
125                 err = errors.New("rmr.Send() failed")
126         }
127         return
128 }
129
130 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
131         payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
132         if err != nil {
133                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
134                 return
135         }
136         xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
137
138         /* Reserve a sequence number and set it in the payload */
139         new_sub_id := c.registry.ReserveSequenceNumber()
140
141         _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
142         if err != nil {
143                 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
144                 return
145         }
146
147         src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
148         if err != nil {
149                 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
150                 return
151         }
152
153         /* Create transatcion records for every subscription request */
154         xact_key := Transaction_key{new_sub_id, CREATE}
155         xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
156         err = c.tracker.Track_transaction(xact_key, xact_value)
157         if err != nil {
158                 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
159                 return
160         }
161
162         /* Update routing manager about the new subscription*/
163         sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
164         go c.rtmgrClient.SubscriptionRequestUpdate()
165         SubscriptionReqChan <- sub_route_action
166
167         // Setting new subscription ID in the RMR header
168         params.SubId = int(new_sub_id)
169
170         xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
171         c.rmrSend(params)
172         xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
173         return
174 }
175
176 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
177         payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
178         if err != nil {
179                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
180                 return
181         }
182         xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
183         if !c.registry.IsValidSequenceNumber(payload_seq_num) {
184                 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
185                 return
186         }
187         c.registry.setSubscriptionToConfirmed(payload_seq_num)
188         xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
189         transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
190         if err != nil {
191                 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
192                 return
193         }
194         xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
195         params.Mbuf = transaction.Mbuf
196         c.rmrReplyToSender(params)
197         return
198 }
199
200 func (act Action) String() string {
201         actions := [...]string{
202                 "CREATE",
203                 "MERGE",
204                 "DELETE",
205         }
206
207         if act < CREATE || act > DELETE {
208                 return "Unknown"
209         }
210         return actions[act]
211 }
212
213 func (act Action) valid() bool {
214         switch act {
215         case CREATE, MERGE, DELETE:
216                 return true
217         default:
218                 return false
219         }
220 }
221
222 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
223         payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
224         if err != nil {
225                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
226                 return
227         }
228         xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
229         if c.registry.IsValidSequenceNumber(payload_seq_num) {
230                 c.registry.deleteSubscription(payload_seq_num)
231                 trackErr := c.trackDeleteTransaction(params, payload_seq_num)
232                 if trackErr != nil {
233                         xapp.Logger.Error("Failed to create a transaction record due to %v", err)
234                         return trackErr
235                 }
236         }
237         xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
238         c.rmrSend(params)
239         return
240 }
241
242 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
243         src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
244         xact_key := Transaction_key{payload_seq_num, DELETE}
245         xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
246         err = c.tracker.Track_transaction(xact_key, xact_value)
247         return
248 }
249
250 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
251         payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
252         if err != nil {
253                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
254                 return
255         }
256     var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
257         sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
258         go c.rtmgrClient.SubscriptionRequestUpdate()
259         SubscriptionReqChan <- sub_route_action
260
261         xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
262         if c.registry.releaseSequenceNumber(payload_seq_num) {
263                 transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
264                 if err != nil {
265                         xapp.Logger.Error("Failed to create a transaction record due to %v", err)
266                         return
267                 }
268                 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
269                 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
270                 params.Mbuf = transaction.Mbuf
271                 c.rmrReplyToSender(params)
272         }
273         return
274 }