2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
21 ==================================================================================
25 Abstract: mangos (NNG) Pipeline SBI implementation
35 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 "routing-manager/pkg/rtmgr"
44 rcChan chan *xapp.RMRParams
47 type RMRParams struct {
51 func (params *RMRParams) String() string {
53 sum := md5.Sum(params.Payload)
54 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
58 func NewNngPush() *NngPush {
59 instance := new(NngPush)
63 func (c *NngPush) Initialize(ip string) error {
67 func (c *NngPush) Terminate() error {
71 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
73 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
74 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
75 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
77 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
79 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
85 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
86 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
87 xapp.Logger.Debug("args: %v", *ep)
89 xapp.Rmr.Closewh(ep.Whid)
93 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
94 c.updateEndpoints(rcs, c)
97 func (c *NngPush) DistributeAll(policies *[]string) error {
98 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
99 xapp.Logger.Debug("args: %v", *policies)
101 for _, ep := range rtmgr.Eps {
102 go c.send(ep, policies)
108 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
109 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
111 var policy = []byte{}
112 cumulative_policy := 0
114 maxrecord := xapp.Config.GetInt("maxrecord")
119 for _, pe := range *policies {
121 for j := 0; j < len(b); j++ {
122 policy = append(policy, b[j])
126 if count == maxrecord || cumulative_policy == len(*policies) {
127 params := &RMRParams{&xapp.RMRParams{}}
129 params.PayloadLen = len(policy)
130 params.Payload = []byte(policy)
132 params.Whid = ep.Whid
133 xapp.Rmr.SendMsg(params.RMRParams)
136 xapp.Logger.Debug("Sent message with payload len = %d to %s", params.PayloadLen, ep.Uuid)
140 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
143 func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
144 return c.createEndpoint(payload, c)
147 func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
148 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
149 xapp.Logger.Debug("args: %v", *policies)
151 go c.send(ep, policies)