Temporary Fix for R3 (E2M->E2T issue) - retrying when is_Ready flag in socket handle...
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 import (
32         "errors"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         "nanomsg.org/go/mangos/v2"
35         "nanomsg.org/go/mangos/v2/protocol/push"
36         _ "nanomsg.org/go/mangos/v2/transport/all"
37         "routing-manager/pkg/rtmgr"
38         "strconv"
39         "time"
40 )
41
42 type NngPush struct {
43         Sbi
44         NewSocket CreateNewNngSocketHandler
45 }
46
47 func NewNngPush() *NngPush {
48         instance := new(NngPush)
49         instance.NewSocket = createNewPushSocket
50         return instance
51 }
52
53 func createNewPushSocket() (NngSocket, error) {
54         xapp.Logger.Debug("Invoked: createNewPushSocket()")
55         socket, err := push.NewSocket()
56         if err != nil {
57                 return nil, errors.New("can't create new push socket due to:" + err.Error())
58         }
59         socket.SetPipeEventHook(pipeEventHandler)
60         return socket, nil
61 }
62
63 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
64         xapp.Logger.Debug("Invoked: pipeEventHandler()")
65         xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
66         for _, ep := range rtmgr.Eps {
67                 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
68                 if uri == pipe.Address() {
69                         switch event {
70                         case 1:
71                                 ep.IsReady = true
72                                 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
73                         default:
74                                 ep.IsReady = false
75                                 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
76                         }
77                 }
78         }
79 }
80
81 func (c *NngPush) Initialize(ip string) error {
82         return nil
83 }
84
85 func (c *NngPush) Terminate() error {
86         return nil
87 }
88
89 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90         var err error
91         var socket NngSocket
92         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
93         xapp.Logger.Debug("args: %v", *ep)
94         socket, err = c.NewSocket()
95         if err != nil {
96                 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
97         }
98         ep.Socket = socket
99         err = c.dial(ep)
100         if err != nil {
101                 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
102         }
103         return nil
104 }
105
106 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
107         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
108         xapp.Logger.Debug("args: %v", *ep)
109         if err := ep.Socket.(NngSocket).Close(); err != nil {
110                 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
111         }
112         return nil
113 }
114
115 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
116         c.updateEndpoints(rcs, c)
117 }
118
119 /*
120 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
121 */
122 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
123         xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
124         uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
125         options := make(map[string]interface{})
126         options[mangos.OptionDialAsynch] = true
127         if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
128                 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
129         }
130         return nil
131 }
132
133 /*
134 func (c *NngPush) DistributeAll(policies *[]string) error {
135         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
136         xapp.Logger.Debug("args: %v", *policies)
137         for _, ep := range rtmgr.Eps {
138                         if ep.IsReady {
139                                 go c.send(ep, policies)
140                         } else {
141                                 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
142                         }
143                 }
144         }
145         return nil
146 }
147
148 */
149
150 /*
151         Temporary solution for R3 - E2M -> E2T issue
152 */
153 func (c *NngPush) DistributeAll(policies *[]string) error {
154         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
155         xapp.Logger.Debug("args: %v", *policies)
156         for _, ep := range rtmgr.Eps {
157                 i := 1
158                 for i< 5 {
159                         if ep.IsReady {
160                                 go c.send(ep, policies)
161                                 break
162                         } else {
163                                 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
164                                 time.Sleep(10 * time.Millisecond)
165                                 i++
166                         }
167                 }
168         }
169         return nil
170 }
171
172 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
173         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
174         for _, pe := range *policies {
175                 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
176                         xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
177                 }
178         }
179         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
180 }