2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
37 #include <rmr/RIC_message_types.h>
41 #cgo LDFLAGS: -lrmr_nng -lnng
47 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
48 "nanomsg.org/go/mangos/v2"
49 "nanomsg.org/go/mangos/v2/protocol/push"
50 _ "nanomsg.org/go/mangos/v2/transport/all"
51 "routing-manager/pkg/rtmgr"
58 NewSocket CreateNewNngSocketHandler
59 rcChan chan *xapp.RMRParams
62 func NewNngPush() *NngPush {
63 instance := new(NngPush)
64 instance.NewSocket = createNewPushSocket
68 func createNewPushSocket() (NngSocket, error) {
69 xapp.Logger.Debug("Invoked: createNewPushSocket()")
70 socket, err := push.NewSocket()
72 return nil, errors.New("can't create new push socket due to:" + err.Error())
74 socket.SetPipeEventHook(pipeEventHandler)
78 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
79 xapp.Logger.Debug("Invoked: pipeEventHandler()")
80 xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
81 for _, ep := range rtmgr.Eps {
82 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
83 if uri == pipe.Address() {
87 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
90 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
96 func (c *NngPush) Initialize(ip string) error {
100 func (c *NngPush) Terminate() error {
104 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
107 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
108 xapp.Logger.Debug("args: %v", *ep)
109 socket, err = c.NewSocket()
111 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
116 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
121 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
122 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
123 xapp.Logger.Debug("args: %v", *ep)
124 if err := ep.Socket.(NngSocket).Close(); err != nil {
125 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
130 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
131 c.updateEndpoints(rcs, c)
135 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
137 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
138 xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
139 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
140 options := make(map[string]interface{})
141 options[mangos.OptionDialAsynch] = true
142 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
143 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
148 func (c *NngPush) DistributeAll(policies *[]string) error {
149 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
150 xapp.Logger.Debug("args: %v", *policies)
151 for _, ep := range rtmgr.Eps {
155 go c.send(ep, policies)
158 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
159 time.Sleep(10 * time.Millisecond)
167 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
168 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
169 for _, pe := range *policies {
170 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
171 xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
174 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")