Open RMR connection in a new thread
[ric-plt/rtmgr.git] / pkg / nbi / control.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 package nbi
24
25 import "C"
26
27 import (
28         "errors"
29         //"fmt"
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31         "net/http"
32         "os"
33         "routing-manager/pkg/rpe"
34         "routing-manager/pkg/rtmgr"
35         "routing-manager/pkg/sbi"
36         "routing-manager/pkg/sdl"
37         "strconv"
38         "sync"
39         "time"
40 )
41
42 var m sync.Mutex
43
44 var nbiEngine Engine
45 var sbiEngine sbi.Engine
46 var sdlEngine sdl.Engine
47 var rpeEngine rpe.Engine
48
49 const INTERVAL time.Duration = 60
50
51 func NewControl() Control {
52         return Control{make(chan *xapp.RMRParams)}
53 }
54
55 type Control struct {
56         rcChan chan *xapp.RMRParams
57 }
58
59 func (c *Control) Run() {
60         var err error
61         go c.controlLoop()
62         nbiEngine, sbiEngine, sdlEngine, rpeEngine, err = initRtmgr()
63         if err != nil {
64                 xapp.Logger.Error(err.Error())
65                 os.Exit(1)
66         }
67
68         xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
69
70         xapp.Run(c)
71 }
72
73 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
74         resp, _ := dumpDebugData()
75         xapp.Resource.SendSymptomDataJson(w, r, resp, "platform/rttable.json")
76 }
77
78 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
79         c.rcChan <- rp
80         return
81 }
82
83 func initRtmgr() (nbiEngine Engine, sbiEngine sbi.Engine, sdlEngine sdl.Engine, rpeEngine rpe.Engine, err error) {
84         if nbiEngine, err = GetNbi(xapp.Config.GetString("nbi")); err == nil && nbiEngine != nil {
85                 if sbiEngine, err = sbi.GetSbi(xapp.Config.GetString("sbi")); err == nil && sbiEngine != nil {
86                         if sdlEngine, err = sdl.GetSdl(xapp.Config.GetString("sdl")); err == nil && sdlEngine != nil {
87                                 if rpeEngine, err = rpe.GetRpe(xapp.Config.GetString("rpe")); err == nil && rpeEngine != nil {
88                                         return nbiEngine, sbiEngine, sdlEngine, rpeEngine, nil
89                                 }
90                         }
91                 }
92         }
93         return nil, nil, nil, nil, err
94 }
95
96 func (c *Control) controlLoop() {
97         for {
98                 msg := <-c.rcChan
99                 c.recievermr(msg)
100         }
101 }
102
103 func (c *Control) recievermr(msg *xapp.RMRParams) {
104         xapp_msg := sbi.RMRParams{msg}
105         switch msg.Mtype {
106         case xapp.RICMessageTypes["RMRRM_REQ_TABLE"]:
107                 if rtmgr.Rtmgr_ready == false {
108                         xapp.Logger.Info("Update Route Table Request(RMR to RM), message discarded as routing manager is not ready")
109                 } else {
110                         xapp.Logger.Info("Update Route Table Request(RMR to RM)")
111                         go c.handleUpdateToRoutingManagerRequest(msg)
112                 }
113         case xapp.RICMessageTypes["RMRRM_TABLE_STATE"]:
114                 xapp.Logger.Info("state of table to route mgr %s,payload %s", xapp_msg.String(), msg.Payload)
115         default:
116                 err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
117                 xapp.Logger.Error("Unknown message type: %v", err)
118         }
119         xapp.Rmr.Free(msg.Mbuf)
120 }
121
122 func (c *Control) handleUpdateToRoutingManagerRequest(params *xapp.RMRParams) {
123
124         msg := sbi.RMRParams{params}
125
126         xapp.Logger.Info("Update Route Table Request, msg.String() : %s", msg.String())
127         xapp.Logger.Info("Update Route Table Request, params.Payload : %s", string(params.Payload))
128
129         m.Lock()
130         data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
131         m.Unlock()
132         if data == nil {
133                 if err != nil {
134                         xapp.Logger.Error("Cannot get data from sdl interface due to: " + err.Error())
135                         return
136                 } else {
137                         xapp.Logger.Debug("Cannot get data from sdl interface")
138                         return
139                 }
140         }
141
142         ep := sbiEngine.CheckEndpoint(string(params.Payload))
143         if ep == nil {
144                 xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
145                 return
146         }
147         epstr, whid := sbiEngine.CreateEndpoint(msg.String())
148         if epstr == nil || whid < 0 {
149                 xapp.Logger.Error("Wormhole Id creation failed %d for %s", whid, msg.String())
150                 return
151         }
152
153         /*This is to ensure the latest routes are sent.
154         Assumption is that in this time interval the routes are built for this endpoint */
155         time.Sleep(100 * time.Millisecond)
156         policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
157         err = sbiEngine.DistributeToEp(policies, *epstr, whid)
158         if err != nil {
159                 xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
160                 return
161         }
162 }
163
164 func getConfigData() (*rtmgr.RicComponents, error) {
165         var data *rtmgr.RicComponents
166         m.Lock()
167         data, err := sdlEngine.ReadAll(xapp.Config.GetString("rtfile"))
168
169         m.Unlock()
170         if data == nil {
171                 if err != nil {
172                         return nil, errors.New("Cannot get data from sdl interface due to: " + err.Error())
173                 } else {
174                         xapp.Logger.Debug("Cannot get data from sdl interface, data is null")
175                         return nil, errors.New("Cannot get data from sdl interface")
176                 }
177         }
178
179         return data, nil
180 }
181
182 func updateEp() (err error) {
183         data, err := getConfigData()
184         if err != nil {
185                 return errors.New("Routing table cannot be published due to: " + err.Error())
186         }
187         sbiEngine.UpdateEndpoints(data)
188
189         return nil
190 }
191
192 func sendRoutesToAll() (err error) {
193
194         data, err := getConfigData()
195         if err != nil {
196                 return errors.New("Routing table cannot be published due to: " + err.Error())
197         }
198
199         policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
200         err = sbiEngine.DistributeAll(policies)
201         if err != nil {
202                 return errors.New("Routing table cannot be published due to: " + err.Error())
203         }
204
205         return nil
206 }
207
208 func Serve() {
209
210         nbiErr := nbiEngine.Initialize(xapp.Config.GetString("xmurl"), xapp.Config.GetString("nbiurl"), xapp.Config.GetString("rtfile"), xapp.Config.GetString("cfgfile"), xapp.Config.GetString("e2murl"), sdlEngine, rpeEngine, &m)
211         if nbiErr != nil {
212                 xapp.Logger.Error("Failed to initialize nbi due to: " + nbiErr.Error())
213                 return
214         }
215
216         err := sbiEngine.Initialize(xapp.Config.GetString("sbiurl"))
217         if err != nil {
218                 xapp.Logger.Info("Failed to open push socket due to: " + err.Error())
219                 return
220         }
221         defer nbiEngine.Terminate()
222         defer sbiEngine.Terminate()
223
224         /* used for rtmgr restart case to connect to Endpoints */
225         go updateEp()
226         time.Sleep(5 * time.Second)
227
228         for {
229                 sendRoutesToAll()
230
231                 rtmgr.Rtmgr_ready = true
232                 time.Sleep(INTERVAL * time.Second)
233                 xapp.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
234         }
235 }