2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
37 #include <rmr/RIC_message_types.h>
41 #cgo LDFLAGS: -lrmr_nng -lnng
47 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
48 "nanomsg.org/go/mangos/v2"
49 "nanomsg.org/go/mangos/v2/protocol/push"
50 _ "nanomsg.org/go/mangos/v2/transport/all"
51 "routing-manager/pkg/rtmgr"
59 NewSocket CreateNewNngSocketHandler
60 rcChan chan *xapp.RMRParams
63 func NewNngPush() *NngPush {
64 instance := new(NngPush)
65 instance.NewSocket = createNewPushSocket
69 func createNewPushSocket() (NngSocket, error) {
70 xapp.Logger.Debug("Invoked: createNewPushSocket()")
71 socket, err := push.NewSocket()
73 return nil, errors.New("can't create new push socket due to:" + err.Error())
75 socket.SetPipeEventHook(pipeEventHandler)
79 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
80 xapp.Logger.Debug("Invoked: pipeEventHandler()")
81 xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
82 for _, ep := range rtmgr.Eps {
83 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
84 if uri == pipe.Address() {
88 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
91 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
97 func (c *NngPush) Initialize(ip string) error {
101 func (c *NngPush) Terminate() error {
105 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
108 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
109 xapp.Logger.Debug("args: %v", *ep)
110 socket, err = c.NewSocket()
112 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
117 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
122 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
123 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
124 xapp.Logger.Debug("args: %v", *ep)
125 if err := ep.Socket.(NngSocket).Close(); err != nil {
126 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
131 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
132 c.updateEndpoints(rcs, c)
136 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
138 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
139 xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
140 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
141 options := make(map[string]interface{})
142 options[mangos.OptionDialAsynch] = true
143 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
144 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
149 func (c *NngPush) DistributeAll(policies *[]string) error {
150 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
151 xapp.Logger.Debug("args: %v", *policies)
152 for _, ep := range rtmgr.Eps {
154 go c.send(ep, policies)
156 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
162 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
163 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
164 for _, pe := range *policies {
165 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
166 xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
169 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")