Adding scope of RICPlatform that are under Apache License
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 import (
32         "errors"
33         "routing-manager/pkg/rtmgr"
34         "strconv"
35
36         "nanomsg.org/go/mangos/v2"
37         "nanomsg.org/go/mangos/v2/protocol/push"
38         _ "nanomsg.org/go/mangos/v2/transport/all"
39 )
40
41 type NngPush struct {
42         Sbi
43         NewSocket CreateNewNngSocketHandler
44 }
45
46 func NewNngPush() *NngPush {
47         instance := new(NngPush)
48         instance.NewSocket = createNewPushSocket
49         return instance
50 }
51
52 func createNewPushSocket() (NngSocket, error) {
53         rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
54         socket, err := push.NewSocket()
55         if err != nil {
56                 return nil, errors.New("can't create new push socket due to:" + err.Error())
57         }
58         socket.SetPipeEventHook(pipeEventHandler)
59         return socket, nil
60 }
61
62 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
63         rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
64         for _, ep := range rtmgr.Eps {
65                 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
66                 if uri == pipe.Address() {
67                         switch event {
68                         case 1:
69                                 ep.IsReady = true
70                                 rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
71                         default:
72                                 ep.IsReady = false
73                                 rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
74                         }
75                 }
76         }
77 }
78
79 func (c *NngPush) Initialize(ip string) error {
80         return nil
81 }
82
83 func (c *NngPush) Terminate() error {
84         return nil
85 }
86
87 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
88         var err error
89         var socket NngSocket
90         rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
91         rtmgr.Logger.Debug("args: %v", *ep)
92         socket, err = c.NewSocket()
93         if err != nil {
94                 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
95         }
96         ep.Socket = socket
97         err = c.dial(ep)
98         if err != nil {
99                 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
100         }
101         return nil
102 }
103
104 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
105         rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
106         rtmgr.Logger.Debug("args: %v", *ep)
107         if err := ep.Socket.(NngSocket).Close(); err != nil {
108                 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
109         }
110         return nil
111 }
112
113 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
114         c.updateEndpoints(rcs, c)
115 }
116
117 /*
118 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
119 */
120 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
121         rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
122         uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
123         options := make(map[string]interface{})
124         options[mangos.OptionDialAsynch] = true
125         if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
126                 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
127         }
128         return nil
129 }
130
131 func (c *NngPush) DistributeAll(policies *[]string) error {
132         rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
133         rtmgr.Logger.Debug("args: %v", *policies)
134         for _, ep := range rtmgr.Eps {
135                 if ep.IsReady {
136                         go c.send(ep, policies)
137                 } else {
138                         rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
139                 }
140         }
141         return nil
142 }
143
144 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
145         rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
146         for _, pe := range *policies {
147                 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
148                         rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
149                 }
150         }
151         rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
152 }