RT records are sent in a group rather than individually. Group is configurable
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 import (
32         "bytes"
33         "crypto/md5"
34         "errors"
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36         "routing-manager/pkg/rtmgr"
37         "strconv"
38         //"time"
39         "fmt"
40 )
41
42 type NngPush struct {
43         Sbi
44         rcChan chan *xapp.RMRParams
45 }
46
47 type RMRParams struct {
48         *xapp.RMRParams
49 }
50
51 func (params *RMRParams) String() string {
52         var b bytes.Buffer
53         sum := md5.Sum(params.Payload)
54         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
55         return b.String()
56 }
57
58 func NewNngPush() *NngPush {
59         instance := new(NngPush)
60         return instance
61 }
62
63 func (c *NngPush) Initialize(ip string) error {
64         return nil
65 }
66
67 func (c *NngPush) Terminate() error {
68         return nil
69 }
70
71 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
72
73         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
74         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
75         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
76         if ep.Whid < 0 {
77                 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
78         } else {
79                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
80         }
81
82         return nil
83 }
84
85 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
86         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
87         xapp.Logger.Debug("args: %v", *ep)
88
89         xapp.Rmr.Closewh(ep.Whid)
90         return nil
91 }
92
93 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
94         c.updateEndpoints(rcs, c)
95 }
96
97 func (c *NngPush) DistributeAll(policies *[]string) error {
98         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
99         xapp.Logger.Debug("args: %v", *policies)
100
101         for _, ep := range rtmgr.Eps {
102                 go c.send(ep, policies)
103         }
104
105         return nil
106 }
107
108 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
109         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
110
111         var policy = []byte{}
112         cumulative_policy := 0
113         count := 0
114         maxrecord := xapp.Config.GetInt("maxrecord")
115         if maxrecord == 0 {
116                 maxrecord = 10
117         }
118
119         for _, pe := range *policies {
120                 b := []byte(pe)
121                 for j := 0; j < len(b); j++ {
122                         policy = append(policy, b[j])
123                 }
124                 count++
125                 cumulative_policy++
126                 if count == maxrecord || cumulative_policy == len(*policies) {
127                         params := &RMRParams{&xapp.RMRParams{}}
128                         params.Mtype = 20
129                         params.PayloadLen = len(policy)
130                         params.Payload = []byte(policy)
131                         params.Mbuf = nil
132                         params.Whid = ep.Whid
133                         xapp.Rmr.SendMsg(params.RMRParams)
134                         count = 0
135                         policy = nil
136                         xapp.Logger.Debug("Sent message with payload len = %d", params.PayloadLen)
137                 }
138         }
139
140         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
141 }
142
143 func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
144         return c.createEndpoint(payload, c)
145 }
146
147 func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
148         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
149         xapp.Logger.Debug("args: %v", *policies)
150
151         go c.send(ep, policies)
152
153         return nil
154 }