44512999cba04b37142b1ae9284272926d5289c9
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <time.h>
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <string.h>
36 #include <rmr/rmr.h>
37 #include <rmr/RIC_message_types.h>
38
39
40 #cgo CFLAGS: -I../
41 #cgo LDFLAGS: -lrmr_nng -lnng
42 */
43 import "C"
44
45 import (
46         "bytes"
47         "crypto/md5"
48         "errors"
49         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
50         "nanomsg.org/go/mangos/v2"
51         "nanomsg.org/go/mangos/v2/protocol/push"
52         _ "nanomsg.org/go/mangos/v2/transport/all"
53         "routing-manager/pkg/rtmgr"
54         "strconv"
55         "time"
56         "fmt"
57 )
58
59 type NngPush struct {
60         Sbi
61         NewSocket CreateNewNngSocketHandler
62         rcChan    chan *xapp.RMRParams
63 }
64
65 type RMRParams struct {
66         *xapp.RMRParams
67 }
68
69
70 func (params *RMRParams) String() string {
71         var b bytes.Buffer
72         sum := md5.Sum(params.Payload)
73         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
74         return b.String()
75 }
76
77 func NewNngPush() *NngPush {
78         instance := new(NngPush)
79         instance.NewSocket = createNewPushSocket
80         return instance
81 }
82
83 func createNewPushSocket() (NngSocket, error) {
84         xapp.Logger.Debug("Invoked: createNewPushSocket()")
85         socket, err := push.NewSocket()
86         if err != nil {
87                 return nil, errors.New("can't create new push socket due to:" + err.Error())
88         }
89         socket.SetPipeEventHook(pipeEventHandler)
90         return socket, nil
91 }
92
93 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
94         xapp.Logger.Debug("Invoked: pipeEventHandler()")
95         xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
96         for _, ep := range rtmgr.Eps {
97                 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
98                 if uri == pipe.Address() {
99                         switch event {
100                         case 1:
101                                 ep.IsReady = true
102                                 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
103                         default:
104                                 ep.IsReady = false
105                                 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
106                         }
107                 }
108         }
109 }
110
111 func (c *NngPush) Initialize(ip string) error {
112         return nil
113 }
114
115 func (c *NngPush) Terminate() error {
116         return nil
117 }
118
119 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
120
121         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
122         xapp.Logger.Debug("args: %v", *ep)
123         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
124         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
125         if ep.Whid < 0   {
126                 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
127         }else {
128                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
129         }
130
131         return nil
132 }
133
134 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
135         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
136         xapp.Logger.Debug("args: %v", *ep)
137
138         xapp.Rmr.Closewh(ep.Whid)
139         return nil
140 }
141
142 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
143         c.updateEndpoints(rcs, c)
144 }
145
146 /*
147 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
148 */
149 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
150         xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
151         uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
152         options := make(map[string]interface{})
153         options[mangos.OptionDialAsynch] = true
154         if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
155                 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
156         }
157         return nil
158 }
159
160 func (c *NngPush) DistributeAll(policies *[]string) error {
161         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
162         xapp.Logger.Debug("args: %v", *policies)
163
164         for _, ep := range rtmgr.Eps {
165                 go c.send(ep, policies)
166         }
167
168         return nil
169 }
170
171 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
172         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
173
174         for _, pe := range *policies {
175                 params := &RMRParams{&xapp.RMRParams{}}
176                 params.Mtype = 20
177                 params.PayloadLen = len([]byte(pe))
178                 params.Payload =[]byte(pe)
179                 params.Mbuf = nil
180                 params.Whid = ep.Whid
181                 time.Sleep(1 * time.Millisecond)
182                 xapp.Rmr.SendMsg(params.RMRParams)
183         }
184         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
185 }
186
187 func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
188         return c.createEndpoint(payload, c)
189 }
190
191 func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
192         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
193         xapp.Logger.Debug("args: %v", *policies)
194
195         go c.send(ep, policies)
196
197         return nil
198 }
199