2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "nanomsg.org/go/mangos/v2"
35 "nanomsg.org/go/mangos/v2/protocol/push"
36 _ "nanomsg.org/go/mangos/v2/transport/all"
37 "routing-manager/pkg/rtmgr"
44 NewSocket CreateNewNngSocketHandler
47 func NewNngPush() *NngPush {
48 instance := new(NngPush)
49 instance.NewSocket = createNewPushSocket
53 func createNewPushSocket() (NngSocket, error) {
54 xapp.Logger.Debug("Invoked: createNewPushSocket()")
55 socket, err := push.NewSocket()
57 return nil, errors.New("can't create new push socket due to:" + err.Error())
59 socket.SetPipeEventHook(pipeEventHandler)
63 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
64 xapp.Logger.Debug("Invoked: pipeEventHandler()")
65 xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
66 for _, ep := range rtmgr.Eps {
67 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
68 if uri == pipe.Address() {
72 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
75 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
81 func (c *NngPush) Initialize(ip string) error {
85 func (c *NngPush) Terminate() error {
89 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
92 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
93 xapp.Logger.Debug("args: %v", *ep)
94 socket, err = c.NewSocket()
96 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
101 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
106 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
107 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
108 xapp.Logger.Debug("args: %v", *ep)
109 if err := ep.Socket.(NngSocket).Close(); err != nil {
110 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
115 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
116 c.updateEndpoints(rcs, c)
120 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
122 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
123 xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
124 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
125 options := make(map[string]interface{})
126 options[mangos.OptionDialAsynch] = true
127 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
128 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
134 func (c *NngPush) DistributeAll(policies *[]string) error {
135 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
136 xapp.Logger.Debug("args: %v", *policies)
137 for _, ep := range rtmgr.Eps {
139 go c.send(ep, policies)
141 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
151 Temporary solution for R3 - E2M -> E2T issue
153 func (c *NngPush) DistributeAll(policies *[]string) error {
154 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
155 xapp.Logger.Debug("args: %v", *policies)
156 for _, ep := range rtmgr.Eps {
160 go c.send(ep, policies)
163 xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
164 time.Sleep(10 * time.Millisecond)
172 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
173 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
174 for _, pe := range *policies {
175 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
176 xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
179 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")