0.7.1 Version of submgr
[ric-plt/submgr.git] / pkg / control / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 /*
23 #include <rmr/RIC_message_types.h>
24 #include <rmr/rmr.h>
25
26 #cgo CFLAGS: -I../
27 #cgo LDFLAGS: -lrmr_nng -lnng
28 */
29 import "C"
30
31 import (
32         "errors"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         "github.com/spf13/viper"
35         "github.com/go-openapi/strfmt"
36         httptransport "github.com/go-openapi/runtime/client"
37         rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38         rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
39         "math/rand"
40         "strconv"
41         "time"
42 )
43
44 type Control struct {
45         e2ap        *E2ap
46         registry    *Registry
47         rtmgrClient *RtmgrClient
48         tracker     *Tracker
49 }
50
51 type RMRMeid struct {
52         PlmnID string
53         EnbID  string
54 }
55
56 type RMRParams struct {
57         Mtype           int
58         Payload         []byte
59         PayloadLen      int
60         Meid            *RMRMeid
61         Xid             string
62         SubId           int
63         Src             string
64         Mbuf            *C.rmr_mbuf_t
65 }
66
67 var SEEDSN uint16
68 var SubscriptionReqChan = make(chan subRouteInfo, 10)
69
70 const (
71         CREATE Action = 0
72         MERGE Action = 1
73         DELETE Action = 3
74 )
75
76 func init() {
77         viper.AutomaticEnv()
78         viper.SetEnvPrefix("submgr")
79         viper.AllowEmptyEnv(true)
80         SEEDSN = uint16(viper.GetInt("seed_sn"))
81         if SEEDSN == 0 {
82                 rand.Seed(time.Now().UnixNano())
83                 SEEDSN = uint16(rand.Intn(65535))
84         }
85         if SEEDSN > 65535 {
86                 SEEDSN = 0
87         }
88         xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
89 }
90
91 func NewControl() Control {
92         registry := new(Registry)
93         registry.Initialize(SEEDSN)
94
95         transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
96         client := rtmgrclient.New(transport, strfmt.Default)
97         handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
98         rtmgrClient := RtmgrClient{client, handle}
99
100         return Control{new(E2ap), registry, &rtmgrClient, new(Tracker)}
101 }
102
103 func (c *Control) Run() {
104         xapp.Run(c)
105 }
106
107 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
108         switch rp.Mtype {
109         case C.RIC_SUB_REQ:
110                 err = c.handleSubscriptionRequest(rp)
111         case C.RIC_SUB_RESP:
112                 err = c.handleSubscriptionResponse(rp)
113         case C.RIC_SUB_DEL_REQ:
114                 err = c.handleSubscriptionDeleteRequest(rp)
115         default:
116                 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
117         }
118         return
119 }
120
121 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
122         if !xapp.Rmr.Send(params, false) {
123                 err = errors.New("rmr.Send() failed")
124         }
125         return
126 }
127
128 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
129         payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
130         if err != nil {
131                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
132                 return
133         }
134         xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
135
136         /* Reserve a sequence number and set it in the payload */
137         new_sub_id := c.registry.ReserveSequenceNumber()
138
139         _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
140         if err != nil {
141                 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
142                 return
143         }
144
145         src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
146         if err != nil {
147                 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
148                 return
149         }
150
151         /* Create transatcion records for every subscription request */
152         xact_key := Transaction_key{new_sub_id, CREATE}
153         xact_value := Transaction{*src_addr, *src_port, params.Payload}
154         err = c.tracker.Track_transaction(xact_key, &xact_value)
155         if err != nil {
156                 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
157                 return
158         }
159
160         /* Update routing manager about the new subscription*/
161         sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
162         go c.rtmgrClient.SubscriptionRequestUpdate()
163         SubscriptionReqChan <- sub_route_action
164
165         // Setting new subscription ID in the RMR header
166         params.SubId = int(new_sub_id)
167
168         xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
169         c.rmrSend(params)
170         return
171 }
172
173 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
174         payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
175         if err != nil {
176                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
177                 return
178         }
179         xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
180         if !c.registry.IsValidSequenceNumber(payload_seq_num) {
181                 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
182                 return
183         }
184         c.registry.setSubscriptionToConfirmed(payload_seq_num)
185         xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
186         c.rmrSend(params)
187         return
188 }
189
190 func (act Action) String() string {
191         actions := [...]string{
192                 "CREATE",
193                 "MERGE",
194                 "DELETE",
195         }
196
197         if act < CREATE || act > DELETE {
198                 return "Unknown"
199         }
200         return actions[act]
201 }
202
203 func (act Action) valid() bool {
204         switch act {
205         case CREATE, MERGE, DELETE:
206                 return true
207         default:
208                 return false
209         }
210 }
211
212 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
213         payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
214         if err != nil {
215                 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
216                 return
217         }
218         xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
219         if c.registry.IsValidSequenceNumber(payload_seq_num) {
220                 c.registry.deleteSubscription(payload_seq_num)
221         }
222         xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
223         c.rmrSend(params)
224         return
225 }