2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "nanomsg.org/go/mangos/v2"
35 "nanomsg.org/go/mangos/v2/protocol/push"
36 _ "nanomsg.org/go/mangos/v2/transport/all"
37 "routing-manager/pkg/rtmgr"
43 NewSocket CreateNewNngSocketHandler
46 func NewNngPush() *NngPush {
47 instance := new(NngPush)
48 instance.NewSocket = createNewPushSocket
52 func createNewPushSocket() (NngSocket, error) {
53 xapp.Logger.Debug("Invoked: createNewPushSocket()")
54 socket, err := push.NewSocket()
56 return nil, errors.New("can't create new push socket due to:" + err.Error())
58 socket.SetPipeEventHook(pipeEventHandler)
62 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
63 xapp.Logger.Debug("Invoked: pipeEventHandler()")
64 xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
65 for _, ep := range rtmgr.Eps {
66 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
67 if uri == pipe.Address() {
71 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
74 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
80 func (c *NngPush) Initialize(ip string) error {
84 func (c *NngPush) Terminate() error {
88 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
91 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
92 xapp.Logger.Debug("args: %v", *ep)
93 socket, err = c.NewSocket()
95 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
100 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
105 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
106 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
107 xapp.Logger.Debug("args: %v", *ep)
108 if err := ep.Socket.(NngSocket).Close(); err != nil {
109 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
114 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
115 c.updateEndpoints(rcs, c)
119 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
121 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
122 xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
123 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
124 options := make(map[string]interface{})
125 options[mangos.OptionDialAsynch] = true
126 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
127 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
132 func (c *NngPush) DistributeAll(policies *[]string) error {
133 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
134 xapp.Logger.Debug("args: %v", *policies)
135 for _, ep := range rtmgr.Eps {
137 go c.send(ep, policies)
139 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
145 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
146 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
147 for _, pe := range *policies {
148 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
149 xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
152 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")