RIC:1060: Change in PTL
[ric-plt/rtmgr.git] / pkg / sbi / sbi.go
1 /*
2 w
3 ==================================================================================
4   Copyright (c) 2019 AT&T Intellectual Property.
5   Copyright (c) 2019 Nokia
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     sbi.go
26   Abstract:     Contains SBI (SouthBound Interface) module definitions and generic SBI components
27   Date:         16 March 2019
28 */
29
30 package sbi
31
32 import (
33         "errors"
34         "fmt"
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36         "net"
37         "routing-manager/pkg/rtmgr"
38         "strconv"
39         "strings"
40 )
41
42 const DefaultRmrPipelineSocketPrefix = "tcp://"
43 const DefaultRmrPipelineSocketNumber = 4561
44 const PlatformType = "platform"
45
46 var (
47         SupportedSbis = []*EngineConfig{
48                 {
49                         Name:        "rmrpush",
50                         Version:     "v1",
51                         Protocol:    "rmrpipeline",
52                         Instance:    NewRmrPush(),
53                         IsAvailable: true,
54                 },
55         }
56 )
57
58 func GetSbi(sbiName string) (Engine, error) {
59         for _, sbi := range SupportedSbis {
60                 if sbi.Name == sbiName && sbi.IsAvailable {
61                         return sbi.Instance, nil
62                 }
63         }
64         return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
65 }
66
67 type Sbi struct {
68 }
69
70 func (s *Sbi) pruneEndpointList(sbi Engine) {
71         xapp.Logger.Debug("pruneEndpointList invoked.")
72         for _, ep := range rtmgr.Eps {
73                 if !ep.Keepalive {
74                         xapp.Logger.Debug("deleting %v", ep)
75                         sbi.DeleteEndpoint(ep)
76                         delete(rtmgr.Eps, ep.Uuid)
77                 } else {
78                         if rtmgr.Eps[ep.Uuid] != nil {
79                                 rtmgr.Eps[ep.Uuid].Keepalive = false
80                         }
81                 }
82         }
83 }
84
85 func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) {
86         for _, xapps := range (*rcs).XApps {
87                 for _, instance := range xapps.Instances {
88                         uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
89                         if _, ok := rtmgr.Eps[uuid]; ok {
90                                 rtmgr.Eps[uuid].Keepalive = true
91                         } else {
92                                 ep := &rtmgr.Endpoint{
93                                         Uuid:       uuid,
94                                         Name:       instance.Name,
95                                         XAppType:   xapps.Name,
96                                         Ip:         instance.Ip,
97                                         Port:       instance.Port,
98                                         TxMessages: instance.TxMessages,
99                                         RxMessages: instance.RxMessages,
100                                         Policies:   instance.Policies,
101                                         Socket:     nil,
102                                         IsReady:    false,
103                                         Keepalive:  true,
104                                 }
105                                 if err := sbi.AddEndpoint(ep); err != nil {
106                                         xapp.Logger.Error("Cannot create socket for endpoint: " + ep.Name + " due to:" + err.Error())
107                                         continue
108                                 }
109                                 rtmgr.Eps[uuid] = ep
110                         }
111                 }
112         }
113         s.updatePlatformEndpoints(&((*rcs).Pcs), sbi)
114         s.updateE2TEndpoints(&((*rcs).E2Ts), sbi)
115         s.pruneEndpointList(sbi)
116 }
117
118 func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) {
119         xapp.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs)
120         for _, pc := range *pcs {
121                 uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port))
122                 if _, ok := rtmgr.Eps[uuid]; ok {
123                         rtmgr.Eps[uuid].Keepalive = true
124                 } else {
125                         ep := &rtmgr.Endpoint{
126                                 Uuid:     uuid,
127                                 Name:     pc.Name,
128                                 XAppType: PlatformType,
129                                 Ip:       pc.Fqdn,
130                                 Port:     pc.Port,
131                                 //TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
132                                 //RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
133                                 Socket:    nil,
134                                 IsReady:   false,
135                                 Keepalive: true,
136                         }
137                         xapp.Logger.Debug("ep created: %v", ep)
138                         if err := sbi.AddEndpoint(ep); err != nil {
139                                 xapp.Logger.Error("Cannot create socket for endpoint: " + ep.Name + " due to:" + err.Error())
140                                 continue
141                         }
142                         rtmgr.Eps[uuid] = ep
143                 }
144         }
145 }
146
147 func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) {
148         xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts)
149         for _, e2t := range *E2Ts {
150                 uuid := e2t.Fqdn
151                 stringSlice := strings.Split(e2t.Fqdn, ":")
152                 ipaddress := stringSlice[0]
153                 port, _ := strconv.Atoi(stringSlice[1])
154                 if _, ok := rtmgr.Eps[uuid]; ok {
155                         rtmgr.Eps[uuid].Keepalive = true
156                 } else {
157                         ep := &rtmgr.Endpoint{
158                                 Uuid:     uuid,
159                                 Name:     e2t.Name,
160                                 XAppType: PlatformType,
161                                 Ip:       ipaddress,
162                                 Port:     uint16(port),
163                                 //TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
164                                 //RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
165                                 Socket:    nil,
166                                 IsReady:   false,
167                                 Keepalive: true,
168                         }
169                         xapp.Logger.Debug("ep created: %v", ep)
170                         if err := sbi.AddEndpoint(ep); err != nil {
171                                 xapp.Logger.Error("Cannot create socket for endpoint: " + ep.Name + " due to:" + err.Error())
172                                 continue
173                         }
174                         rtmgr.Eps[uuid] = ep
175                 }
176         }
177 }
178
179 func (s *Sbi) checkEndpoint(payload string) *rtmgr.Endpoint {
180         /* Payload contains endpoint in the form of IP<domain name>:Port.
181         Port is data port of sender endpoint.
182         Eps contains the UUID in the form of IP<domain name>:Port.
183         Port is the Application Port(http) */
184
185         xapp.Logger.Debug("Invoked checkEndPoint %v", payload)
186         stringSlice := strings.Split(payload, " ")
187         uuid := stringSlice[0]
188         stringsubsplit := strings.Split(uuid, ":")
189         xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
190         for _, ep := range rtmgr.Eps {
191                 if strings.Contains(ep.Uuid, stringsubsplit[0]) == true {
192                         endpoint := rtmgr.Eps[ep.Uuid]
193                         return endpoint
194                 }
195         }
196
197         /* incase the stored Endpoint list is in the form of IP:port*/
198         addr, err := net.LookupIP(stringsubsplit[0])
199         if err == nil {
200                 convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1])
201                 xapp.Logger.Info(" IP:Port received is %s", convertedUuid)
202                 IP := fmt.Sprintf("%s", addr[0])
203                 for _, ep := range rtmgr.Eps {
204                         res := strings.Contains(ep.Uuid, IP)
205                         if res == true {
206                                 endpoint := rtmgr.Eps[ep.Uuid]
207                                 return endpoint
208                         }
209                 }
210         }
211         return nil
212 }
213
214 func (s *Sbi) createEndpoint(rmrsrc string) (*string, int) {
215         /* Create a new mapping, this case is assumed for multiple process sending RMR request from a container */
216         srcString := strings.Split(rmrsrc, " ")
217         srcStringSlice := strings.Split(srcString[0], "=")
218         Whid := int(xapp.Rmr.Openwh(srcStringSlice[1]))
219
220         xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s", Whid, srcStringSlice[1])
221         return &srcStringSlice[1], Whid
222 }