Taking old commit for releasing image
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <time.h>
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <string.h>
36 #include <rmr/rmr.h>
37 #include <rmr/RIC_message_types.h>
38
39
40 #cgo CFLAGS: -I../
41 #cgo LDFLAGS: -lrmr_nng -lnng
42 */
43 import "C"
44
45 import (
46         "errors"
47         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
48         "nanomsg.org/go/mangos/v2"
49         "nanomsg.org/go/mangos/v2/protocol/push"
50         _ "nanomsg.org/go/mangos/v2/transport/all"
51         "routing-manager/pkg/rtmgr"
52         "strconv"
53         "time"
54 )
55
56 type NngPush struct {
57         Sbi
58         NewSocket CreateNewNngSocketHandler
59         rcChan    chan *xapp.RMRParams
60 }
61
62 func NewNngPush() *NngPush {
63         instance := new(NngPush)
64         instance.NewSocket = createNewPushSocket
65         return instance
66 }
67
68 func createNewPushSocket() (NngSocket, error) {
69         xapp.Logger.Debug("Invoked: createNewPushSocket()")
70         socket, err := push.NewSocket()
71         if err != nil {
72                 return nil, errors.New("can't create new push socket due to:" + err.Error())
73         }
74         socket.SetPipeEventHook(pipeEventHandler)
75         return socket, nil
76 }
77
78 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
79         xapp.Logger.Debug("Invoked: pipeEventHandler()")
80         xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
81         for _, ep := range rtmgr.Eps {
82                 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
83                 if uri == pipe.Address() {
84                         switch event {
85                         case 1:
86                                 ep.IsReady = true
87                                 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
88                         default:
89                                 ep.IsReady = false
90                                 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
91                         }
92                 }
93         }
94 }
95
96 func (c *NngPush) Initialize(ip string) error {
97         return nil
98 }
99
100 func (c *NngPush) Terminate() error {
101         return nil
102 }
103
104 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
105         var err error
106         var socket NngSocket
107         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
108         xapp.Logger.Debug("args: %v", *ep)
109         socket, err = c.NewSocket()
110         if err != nil {
111                 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
112         }
113         ep.Socket = socket
114         err = c.dial(ep)
115         if err != nil {
116                 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
117         }
118         return nil
119 }
120
121 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
122         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
123         xapp.Logger.Debug("args: %v", *ep)
124         if err := ep.Socket.(NngSocket).Close(); err != nil {
125                 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
126         }
127         return nil
128 }
129
130 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
131         c.updateEndpoints(rcs, c)
132 }
133
134 /*
135 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
136 */
137 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
138         xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
139         uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
140         options := make(map[string]interface{})
141         options[mangos.OptionDialAsynch] = true
142         if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
143                 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
144         }
145         return nil
146 }
147
148 func (c *NngPush) DistributeAll(policies *[]string) error {
149         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
150         xapp.Logger.Debug("args: %v", *policies)
151         for _, ep := range rtmgr.Eps {
152                 i := 1
153                 for i < 5 {
154                         if ep.IsReady {
155                                 go c.send(ep, policies)
156                                 break
157                         } else {
158                                 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
159                                 time.Sleep(10 * time.Millisecond)
160                                 i++
161                         }
162                 }
163         }
164         return nil
165 }
166
167 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
168         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
169         for _, pe := range *policies {
170                 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
171                         xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
172                 }
173         }
174         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
175 }