f6cd771e7e20a5e58af049cd5ad5dbbfec76dc7f
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 /*
23 #include <rmr/RIC_message_types.h>
24 #include <rmr/rmr.h>
25
26 #cgo CFLAGS: -I../
27 #cgo LDFLAGS: -lrmr_nng -lnng
28 */
29 import "C"
30
31 import (
32         "errors"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         "github.com/spf13/viper"
35         "github.com/go-openapi/strfmt"
36         httptransport "github.com/go-openapi/runtime/client"
37         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
39         "math/rand"
40         "strconv"
41         "time"
42 )
43
44 type Control struct {
45         e2ap        *E2ap
46         registry    *Registry
47         rtmgrClient *RtmgrClient
48         tracker     *Tracker
49 }
50
51 type RMRMeid struct {
52         PlmnID string
53         EnbID  string
54 }
55
56 type RMRParams struct {
57         Mtype           int
58         Payload         []byte
59         PayloadLen      int
60         Meid            *RMRMeid
61         Xid             string
62         SubId           int
63         Src             string
64         Mbuf            *C.rmr_mbuf_t
65 }
66
67 var SEEDSN uint16
68 var SubscriptionReqChan = make(chan subRouteInfo, 10)
69
70 const (
71         CREATE Action = 0
72         MERGE Action = 1
73         DELETE Action = 3
74 )
75
76 func init() {
77         viper.AutomaticEnv()
78         viper.SetEnvPrefix("submgr")
79         viper.AllowEmptyEnv(true)
80         SEEDSN = uint16(viper.GetInt("seed_sn"))
81         if SEEDSN == 0 {
82                 rand.Seed(time.Now().UnixNano())
83                 SEEDSN = uint16(rand.Intn(65535))
84         }
85         if SEEDSN > 65535 {
86                 SEEDSN = 0
87         }
88         xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
89 }
90
91 func NewControl() Control {
92         registry := new(Registry)
93         registry.Initialize(SEEDSN)
94
95         tracker := new(Tracker)
96         tracker.Init()
97
98         transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
99         client := rtmgrclient.New(transport, strfmt.Default)
100         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101         rtmgrClient := RtmgrClient{client, handle}
102
103         return Control{new(E2ap), registry, &rtmgrClient, tracker}
104 }
105
106 func (c *Control) Run() {
107         xapp.Run(c)
108 }
109
110 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
111         switch rp.Mtype {
112         case C.RIC_SUB_REQ:
113                 err = c.handleSubscriptionRequest(rp)
114         case C.RIC_SUB_RESP:
115                 err = c.handleSubscriptionResponse(rp)
116         case C.RIC_SUB_DEL_REQ:
117                 err = c.handleSubscriptionDeleteRequest(rp)
118         default:
119                 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
120         }
121         return
122 }
123
124 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
125         if !xapp.Rmr.Send(params, false) {
126                 err = errors.New("rmr.Send() failed")
127         }
128         return
129 }
130
131 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
132         payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
133         if err != nil {
134                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
135                 return
136         }
137         xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
138
139         /* Reserve a sequence number and set it in the payload */
140         new_sub_id := c.registry.ReserveSequenceNumber()
141
142         _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
143         if err != nil {
144                 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
145                 return
146         }
147
148         src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
149         if err != nil {
150                 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
151                 return
152         }
153
154         /* Create transatcion records for every subscription request */
155         xact_key := Transaction_key{new_sub_id, CREATE}
156         xact_value := Transaction{*src_addr, *src_port, params.Payload}
157         err = c.tracker.Track_transaction(xact_key, xact_value)
158         if err != nil {
159                 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
160                 return
161         }
162
163         /* Update routing manager about the new subscription*/
164         sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
165         go c.rtmgrClient.SubscriptionRequestUpdate()
166         SubscriptionReqChan <- sub_route_action
167
168         // Setting new subscription ID in the RMR header
169         params.SubId = int(new_sub_id)
170
171         xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
172         c.rmrSend(params)
173         return
174 }
175
176 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
177         payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
178         if err != nil {
179                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
180                 return
181         }
182         xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
183         if !c.registry.IsValidSequenceNumber(payload_seq_num) {
184                 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
185                 return
186         }
187         c.registry.setSubscriptionToConfirmed(payload_seq_num)
188         xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
189         c.rmrSend(params)
190         return
191 }
192
193 func (act Action) String() string {
194         actions := [...]string{
195                 "CREATE",
196                 "MERGE",
197                 "DELETE",
198         }
199
200         if act < CREATE || act > DELETE {
201                 return "Unknown"
202         }
203         return actions[act]
204 }
205
206 func (act Action) valid() bool {
207         switch act {
208         case CREATE, MERGE, DELETE:
209                 return true
210         default:
211                 return false
212         }
213 }
214
215 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
216         payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
217         if err != nil {
218                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
219                 return
220         }
221         xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
222         if c.registry.IsValidSequenceNumber(payload_seq_num) {
223                 c.registry.deleteSubscription(payload_seq_num)
224         }
225         xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
226         c.rmrSend(params)
227         return
228 }