2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
37 #include <rmr/RIC_message_types.h>
41 #cgo LDFLAGS: -lrmr_nng -lnng
49 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
50 "nanomsg.org/go/mangos/v2"
51 "nanomsg.org/go/mangos/v2/protocol/push"
52 _ "nanomsg.org/go/mangos/v2/transport/all"
53 "routing-manager/pkg/rtmgr"
61 NewSocket CreateNewNngSocketHandler
62 rcChan chan *xapp.RMRParams
65 type RMRParams struct {
70 func (params *RMRParams) String() string {
72 sum := md5.Sum(params.Payload)
73 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
77 func NewNngPush() *NngPush {
78 instance := new(NngPush)
79 instance.NewSocket = createNewPushSocket
83 func createNewPushSocket() (NngSocket, error) {
84 xapp.Logger.Debug("Invoked: createNewPushSocket()")
85 socket, err := push.NewSocket()
87 return nil, errors.New("can't create new push socket due to:" + err.Error())
89 socket.SetPipeEventHook(pipeEventHandler)
93 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
94 xapp.Logger.Debug("Invoked: pipeEventHandler()")
95 xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
96 for _, ep := range rtmgr.Eps {
97 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
98 if uri == pipe.Address() {
102 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
105 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
111 func (c *NngPush) Initialize(ip string) error {
115 func (c *NngPush) Terminate() error {
119 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
121 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
122 xapp.Logger.Debug("args: %v", *ep)
123 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
124 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
126 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
128 xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
134 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
135 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
136 xapp.Logger.Debug("args: %v", *ep)
138 xapp.Rmr.Closewh(ep.Whid)
142 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
143 c.updateEndpoints(rcs, c)
147 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
149 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
150 xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
151 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
152 options := make(map[string]interface{})
153 options[mangos.OptionDialAsynch] = true
154 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
155 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
160 func (c *NngPush) DistributeAll(policies *[]string) error {
161 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
162 xapp.Logger.Debug("args: %v", *policies)
164 for _, ep := range rtmgr.Eps {
165 go c.send(ep, policies)
171 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
172 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
174 for _, pe := range *policies {
175 params := &RMRParams{&xapp.RMRParams{}}
177 params.PayloadLen = len([]byte(pe))
178 params.Payload =[]byte(pe)
180 params.Whid = ep.Whid
181 time.Sleep(1 * time.Millisecond)
182 xapp.Rmr.SendMsg(params.RMRParams)
184 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
187 func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
188 return c.createEndpoint(payload, c)
191 func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
192 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
193 xapp.Logger.Debug("args: %v", *policies)
195 go c.send(ep, policies)