1f0e0e6fa7d1ce6db934f37ad2d9a15555c03782
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     rmrpipe.go
25   Abstract: mangos (RMR) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <rmr/rmr.h>
33 */
34 import "C"
35
36 import (
37         "bytes"
38         "crypto/md5"
39         "errors"
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
41         "routing-manager/pkg/rtmgr"
42         "strconv"
43         "strings"
44         "fmt"
45 )
46
47 var rmrcallid = 1
48 var rmrdynamiccallid = 201
49
50 type RmrPush struct {
51         Sbi
52         rcChan chan *xapp.RMRParams
53 }
54
55 type EPStatus struct {
56         endpoint string
57         status   bool
58 }
59
60 type RMRParams struct {
61         *xapp.RMRParams
62 }
63
64 func (params *RMRParams) String() string {
65         var b bytes.Buffer
66         sum := md5.Sum(params.Payload)
67         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
68         return b.String()
69 }
70
71 func NewRmrPush() *RmrPush {
72         instance := new(RmrPush)
73         return instance
74 }
75
76 func (c *RmrPush) Initialize(ip string) error {
77         return nil
78 }
79
80 func (c *RmrPush) Terminate() error {
81         return nil
82 }
83
84 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
85
86         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
87         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
88         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
89         if ep.Whid < 0 {
90                 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
91         } else {
92                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
93         }
94
95         return nil
96 }
97
98 func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
99         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100         xapp.Logger.Debug("args: %v", *ep)
101
102         xapp.Rmr.Closewh(ep.Whid)
103         return nil
104 }
105
106 func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
107         c.updateEndpoints(rcs, c)
108 }
109
110 func (c *RmrPush) DistributeAll(policies *[]string) error {
111         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112         xapp.Logger.Debug("args: %v", *policies)
113
114         /*for _, ep := range rtmgr.Eps {
115                 go c.send(ep, policies)
116         }*/
117         channel := make(chan EPStatus)
118
119         if rmrcallid == 200 {
120                 rmrcallid = 1
121         }
122
123         for _, ep := range rtmgr.Eps {
124                 go c.send_sync(ep,  policies, channel, rmrcallid)
125         }
126         rmrcallid++
127
128         count := 0
129         result := make([]EPStatus, len(rtmgr.Eps))
130         for i, _ := range result {
131                 result[i] = <-channel
132                 if result[i].status == true {
133                         count++
134                 } else {
135                         xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
136                 }
137         }
138
139         if count < len(rtmgr.Eps) {
140                 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
141         }
142
143
144         return nil
145 }
146
147 func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
148         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
149
150         ret := c.send_data(ep, policies, call_id)
151
152         channel <- EPStatus{ep.Uuid, ret}
153
154 }
155
156 func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
157         xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
158         var state int
159         var retstr string
160
161         var policy = []byte{}
162
163         for _, pe := range *policies {
164                 b:= []byte(pe)
165                 for j:=0; j<len(b); j++{
166                         policy = append(policy,b[j])
167                 }
168         }
169         params := &RMRParams{&xapp.RMRParams{}}
170         params.Mtype = 20
171         params.PayloadLen = len(policy)
172         params.Payload =[]byte(policy)
173         params.Mbuf = nil
174         params.Whid = ep.Whid
175         params.Callid = call_id
176         params.Timeout = 200
177         state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
178         routestatus := strings.Split(retstr," ")
179         if state != C.RMR_OK && routestatus[0] == "OK" {
180               xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
181               return false
182         } else {
183                 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
184               return true
185         }
186
187         xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
188         return false
189 }
190
191 func (c *RmrPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
192         return c.createEndpoint(payload, c)
193 }
194
195 func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
196         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
197         xapp.Logger.Debug("args: %v", *policies)
198
199         if rmrdynamiccallid == 255 {
200                 rmrdynamiccallid = 201
201         }
202
203         go c.send_data(ep, policies,rmrdynamiccallid)
204         rmrdynamiccallid++
205
206         return nil
207 }