Handling of subscription merge and inclusion of RMR lib from xapp-framework
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17
18    This source code is part of the near-RT RIC (RAN Intelligent Controller)
19    platform project (RICP).
20
21 ==================================================================================
22 */
23 /*
24   Mnemonic:     nngpipe.go
25   Abstract: mangos (NNG) Pipeline SBI implementation
26   Date:         12 March 2019
27 */
28
29 package sbi
30
31 /*
32 #include <time.h>
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <string.h>
36 #include <rmr/rmr.h>
37 #include <rmr/RIC_message_types.h>
38
39
40 #cgo CFLAGS: -I../
41 #cgo LDFLAGS: -lrmr_nng -lnng
42 */
43 import "C"
44
45 import (
46         "errors"
47         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
48         "nanomsg.org/go/mangos/v2"
49         "nanomsg.org/go/mangos/v2/protocol/push"
50         _ "nanomsg.org/go/mangos/v2/transport/all"
51         "routing-manager/pkg/rtmgr"
52         "strconv"
53 )
54
55
56
57 type NngPush struct {
58         Sbi
59         NewSocket CreateNewNngSocketHandler
60         rcChan      chan *xapp.RMRParams
61 }
62
63 func NewNngPush() *NngPush {
64         instance := new(NngPush)
65         instance.NewSocket = createNewPushSocket
66         return instance
67 }
68
69 func createNewPushSocket() (NngSocket, error) {
70         xapp.Logger.Debug("Invoked: createNewPushSocket()")
71         socket, err := push.NewSocket()
72         if err != nil {
73                 return nil, errors.New("can't create new push socket due to:" + err.Error())
74         }
75         socket.SetPipeEventHook(pipeEventHandler)
76         return socket, nil
77 }
78
79 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
80         xapp.Logger.Debug("Invoked: pipeEventHandler()")
81         xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
82         for _, ep := range rtmgr.Eps {
83                 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
84                 if uri == pipe.Address() {
85                         switch event {
86                         case 1:
87                                 ep.IsReady = true
88                                 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
89                         default:
90                                 ep.IsReady = false
91                                 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
92                         }
93                 }
94         }
95 }
96
97 func (c *NngPush) Initialize(ip string) error {
98         return nil
99 }
100
101 func (c *NngPush) Terminate() error {
102         return nil
103 }
104
105 func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
106         var err error
107         var socket NngSocket
108         xapp.Logger.Debug("Invoked sbi.AddEndpoint")
109         xapp.Logger.Debug("args: %v", *ep)
110         socket, err = c.NewSocket()
111         if err != nil {
112                 return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
113         }
114         ep.Socket = socket
115         err = c.dial(ep)
116         if err != nil {
117                 return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
118         }
119         return nil
120 }
121
122 func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
123         xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
124         xapp.Logger.Debug("args: %v", *ep)
125         if err := ep.Socket.(NngSocket).Close(); err != nil {
126                 return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
127         }
128         return nil
129 }
130
131 func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
132         c.updateEndpoints(rcs, c)
133 }
134
135 /*
136 NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
137 */
138 func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
139         xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
140         uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
141         options := make(map[string]interface{})
142         options[mangos.OptionDialAsynch] = true
143         if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
144                 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
145         }
146         return nil
147 }
148
149 func (c *NngPush) DistributeAll(policies *[]string) error {
150         xapp.Logger.Debug("Invoked: sbi.DistributeAll")
151         xapp.Logger.Debug("args: %v", *policies)
152         for _, ep := range rtmgr.Eps {
153                 if ep.IsReady {
154                         go c.send(ep, policies)
155                 } else {
156                         xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
157                 }
158         }
159         return nil
160 }
161
162 func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
163         xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
164         for _, pe := range *policies {
165                 if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
166                         xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
167                 }
168         }
169         xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
170 }