Added config and logger module from xapp-fwk. Added Routes related to A1Mediator...
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 import (
32         "errors"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34         "nanomsg.org/go/mangos/v2"
35         "nanomsg.org/go/mangos/v2/protocol/push"
36         _ "nanomsg.org/go/mangos/v2/transport/all"
37         "routing-manager/pkg/rtmgr"
38         "strconv"
39 )
40
41 type NngPush struct {
42         Sbi
43         NewSocket CreateNewNngSocketHandler
44 }
45
46 func NewNngPush() *NngPush {
47         instance := new(NngPush)
48         instance.NewSocket = createNewPushSocket
49         return instance
50 }
51
52 func createNewPushSocket() (NngSocket, error) {
53         xapp.Logger.Debug("Invoked: createNewPushSocket()")
54         socket, err := push.NewSocket()
55         if err != nil {
56                 return nil, errors.New("can't create new push socket due to:" + err.Error())
57         }
58         socket.SetPipeEventHook(pipeEventHandler)
59         return socket, nil
60 }
61
62 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
63         xapp.Logger.Debug("Invoked: pipeEventHandler()")
64         xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
65         for _, ep := range rtmgr.Eps {
66                 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
67                 if uri == pipe.Address() {
68                         switch event {
69                         case 1:
70                                 ep.IsReady = true
71                                 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
72                         default:
73                                 ep.IsReady = false
74                                 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
75                         }
76                 }
77         }
78 }
79
80 func (c *NngPush) Initialize(ip string) error {
81         return nil
82 }
83
84 func (c *NngPush) Terminate() error {
85         return nil
86 }
87
88 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
89         var err error
90         var socket NngSocket
91         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
92         xapp.Logger.Debug("args: %v", *ep)
93         socket, err = c.NewSocket()
94         if err != nil {
95                 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
96         }
97         ep.Socket = socket
98         err = c.dial(ep)
99         if err != nil {
100                 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
101         }
102         return nil
103 }
104
105 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
106         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
107         xapp.Logger.Debug("args: %v", *ep)
108         if err := ep.Socket.(NngSocket).Close(); err != nil {
109                 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
110         }
111         return nil
112 }
113
114 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
115         c.updateEndpoints(rcs, c)
116 }
117
118 /*
119 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
120 */
121 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
122         xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
123         uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
124         options := make(map[string]interface{})
125         options[mangos.OptionDialAsynch] = true
126         if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
127                 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
128         }
129         return nil
130 }
131
132 func (c *NngPush) DistributeAll(policies *[]string) error {
133         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
134         xapp.Logger.Debug("args: %v", *policies)
135         for _, ep := range rtmgr.Eps {
136                 if ep.IsReady {
137                         go c.send(ep, policies)
138                 } else {
139                         xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
140                 }
141         }
142         return nil
143 }
144
145 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
146         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
147         for _, pe := range *policies {
148                 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
149                         xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
150                 }
151         }
152         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
153 }