677a80c643de668a614ba3fd0fb712c6f3eedbe7
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 import (
32         "bytes"
33         "crypto/md5"
34         "errors"
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36         "routing-manager/pkg/rtmgr"
37         "strconv"
38         "time"
39         "fmt"
40 )
41
42 type NngPush struct {
43         Sbi
44         rcChan    chan *xapp.RMRParams
45 }
46
47 type RMRParams struct {
48         *xapp.RMRParams
49 }
50
51
52 func (params *RMRParams) String() string {
53         var b bytes.Buffer
54         sum := md5.Sum(params.Payload)
55         fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
56         return b.String()
57 }
58
59 func NewNngPush() *NngPush {
60         instance := new(NngPush)
61         return instance
62 }
63
64 func (c *NngPush) Initialize(ip string) error {
65         return nil
66 }
67
68 func (c *NngPush) Terminate() error {
69         return nil
70 }
71
72 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
73
74         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
75         endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
76         ep.Whid = int(xapp.Rmr.Openwh(endpoint))
77         if ep.Whid < 0   {
78                 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
79         }else {
80                 xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
81         }
82
83         return nil
84 }
85
86 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
87         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
88         xapp.Logger.Debug("args: %v", *ep)
89
90         xapp.Rmr.Closewh(ep.Whid)
91         return nil
92 }
93
94 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
95         c.updateEndpoints(rcs, c)
96 }
97
98 func (c *NngPush) DistributeAll(policies *[]string) error {
99         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
100         xapp.Logger.Debug("args: %v", *policies)
101
102         for _, ep := range rtmgr.Eps {
103                 go c.send(ep, policies)
104         }
105
106         return nil
107 }
108
109 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
110         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
111
112         for _, pe := range *policies {
113                 params := &RMRParams{&xapp.RMRParams{}}
114                 params.Mtype = 20
115                 params.PayloadLen = len([]byte(pe))
116                 params.Payload =[]byte(pe)
117                 params.Mbuf = nil
118                 params.Whid = ep.Whid
119                 time.Sleep(1 * time.Millisecond)
120                 xapp.Rmr.SendMsg(params.RMRParams)
121         }
122         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
123 }
124
125 func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
126         return c.createEndpoint(payload, c)
127 }
128
129 func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
130         xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
131         xapp.Logger.Debug("args: %v", *policies)
132
133         go c.send(ep, policies)
134
135         return nil
136 }
137