2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
33 "nanomsg.org/go/mangos/v2"
34 "nanomsg.org/go/mangos/v2/protocol/push"
35 _ "nanomsg.org/go/mangos/v2/transport/all"
36 "routing-manager/pkg/rtmgr"
42 NewSocket CreateNewNngSocketHandler
45 func NewNngPush() *NngPush {
46 instance := new(NngPush)
47 instance.NewSocket = createNewPushSocket
51 func createNewPushSocket() (NngSocket, error) {
52 rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
53 socket, err := push.NewSocket()
55 return nil, errors.New("can't create new push socket due to:" + err.Error())
57 socket.SetPipeEventHook(pipeEventHandler)
61 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
62 rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
63 rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
64 for _, ep := range rtmgr.Eps {
65 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
66 if uri == pipe.Address() {
70 rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
73 rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
79 func (c *NngPush) Initialize(ip string) error {
83 func (c *NngPush) Terminate() error {
87 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
90 rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
91 rtmgr.Logger.Debug("args: %v", *ep)
92 socket, err = c.NewSocket()
94 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
99 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
104 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
105 rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
106 rtmgr.Logger.Debug("args: %v", *ep)
107 if err := ep.Socket.(NngSocket).Close(); err != nil {
108 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
113 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
114 c.updateEndpoints(rcs, c)
118 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
120 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
121 rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
122 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
123 options := make(map[string]interface{})
124 options[mangos.OptionDialAsynch] = true
125 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
126 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
131 func (c *NngPush) DistributeAll(policies *[]string) error {
132 rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
133 rtmgr.Logger.Debug("args: %v", *policies)
134 for _, ep := range rtmgr.Eps {
136 go c.send(ep, policies)
138 rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
144 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
145 rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
146 for _, pe := range *policies {
147 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
148 rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
151 rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")