Making Route Distribution Synchronous
[ric-plt/rtmgr.git] / pkg / sbi / sbi.go
1 /*
2 w
3 ==================================================================================
4   Copyright (c) 2019 AT&T Intellectual Property.
5   Copyright (c) 2019 Nokia
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11        http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18
19    This source code is part of the near-RT RIC (RAN Intelligent Controller)
20    platform project (RICP).
21
22 ==================================================================================
23 */
24 /*
25   Mnemonic:     sbi.go
26   Abstract:     Contains SBI (SouthBound Interface) module definitions and generic SBI components
27   Date:         16 March 2019
28 */
29
30 package sbi
31
32 import (
33         "errors"
34         "fmt"
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36         "net"
37         "routing-manager/pkg/rtmgr"
38         "strconv"
39         "strings"
40 )
41
42 const DefaultRmrPipelineSocketPrefix = "tcp://"
43 const DefaultRmrPipelineSocketNumber = 4561
44 const PlatformType = "platform"
45
46 var (
47         SupportedSbis = []*EngineConfig{
48                 {
49                         Name:        "rmrpush",
50                         Version:     "v1",
51                         Protocol:    "rmrpipeline",
52                         Instance:    NewRmrPush(),
53                         IsAvailable: true,
54                 },
55         }
56 )
57
58 func GetSbi(sbiName string) (Engine, error) {
59         for _, sbi := range SupportedSbis {
60                 if sbi.Name == sbiName && sbi.IsAvailable {
61                         return sbi.Instance, nil
62                 }
63         }
64         return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
65 }
66
67 type Sbi struct {
68 }
69
70 func (s *Sbi) pruneEndpointList(sbi Engine) {
71         xapp.Logger.Debug("pruneEndpointList invoked.")
72         for _, ep := range rtmgr.Eps {
73                 if !ep.Keepalive {
74                         xapp.Logger.Debug("deleting %v", ep)
75                         sbi.DeleteEndpoint(ep)
76                         delete(rtmgr.Eps, ep.Uuid)
77                 } else {
78                         rtmgr.Eps[ep.Uuid].Keepalive = false
79                 }
80         }
81 }
82
83 func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) {
84         for _, xapps := range (*rcs).XApps {
85                 for _, instance := range xapps.Instances {
86                         uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
87                         if _, ok := rtmgr.Eps[uuid]; ok {
88                                 rtmgr.Eps[uuid].Keepalive = true
89                         } else {
90                                 ep := &rtmgr.Endpoint{
91                                         Uuid:       uuid,
92                                         Name:       instance.Name,
93                                         XAppType:   xapps.Name,
94                                         Ip:         instance.Ip,
95                                         Port:       instance.Port,
96                                         TxMessages: instance.TxMessages,
97                                         RxMessages: instance.RxMessages,
98                                         Policies:   instance.Policies,
99                                         Socket:     nil,
100                                         IsReady:    false,
101                                         Keepalive:  true,
102                                 }
103                                 if err := sbi.AddEndpoint(ep); err != nil {
104                                         xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
105                                         continue
106                                 }
107                                 rtmgr.Eps[uuid] = ep
108                         }
109                 }
110         }
111         s.updatePlatformEndpoints(&((*rcs).Pcs), sbi)
112         s.updateE2TEndpoints(&((*rcs).E2Ts), sbi)
113         s.pruneEndpointList(sbi)
114 }
115
116 func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) {
117         xapp.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs)
118         for _, pc := range *pcs {
119                 uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port))
120                 if _, ok := rtmgr.Eps[uuid]; ok {
121                         rtmgr.Eps[uuid].Keepalive = true
122                 } else {
123                         ep := &rtmgr.Endpoint{
124                                 Uuid:       uuid,
125                                 Name:       pc.Name,
126                                 XAppType:   PlatformType,
127                                 Ip:         pc.Fqdn,
128                                 Port:       pc.Port,
129                                 TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
130                                 RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
131                                 Socket:     nil,
132                                 IsReady:    false,
133                                 Keepalive:  true,
134                         }
135                         xapp.Logger.Debug("ep created: %v", ep)
136                         if err := sbi.AddEndpoint(ep); err != nil {
137                                 xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
138                                 continue
139                         }
140                         rtmgr.Eps[uuid] = ep
141                 }
142         }
143 }
144
145 func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) {
146         xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts)
147         for _, e2t := range *E2Ts {
148                 uuid := e2t.Fqdn
149                 stringSlice := strings.Split(e2t.Fqdn, ":")
150                 ipaddress := stringSlice[0]
151                 port, _ := strconv.Atoi(stringSlice[1])
152                 if _, ok := rtmgr.Eps[uuid]; ok {
153                         rtmgr.Eps[uuid].Keepalive = true
154                 } else {
155                         ep := &rtmgr.Endpoint{
156                                 Uuid:       uuid,
157                                 Name:       e2t.Name,
158                                 XAppType:   PlatformType,
159                                 Ip:         ipaddress,
160                                 Port:       uint16(port),
161                                 TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
162                                 RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
163                                 Socket:     nil,
164                                 IsReady:    false,
165                                 Keepalive:  true,
166                         }
167                         xapp.Logger.Debug("ep created: %v", ep)
168                         if err := sbi.AddEndpoint(ep); err != nil {
169                                 xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
170                                 continue
171                         }
172                         rtmgr.Eps[uuid] = ep
173                 }
174         }
175 }
176
177 func (s *Sbi) createEndpoint(payload string, sbi Engine) *rtmgr.Endpoint {
178         xapp.Logger.Debug("CreateEndPoint %v", payload)
179         stringSlice := strings.Split(payload, " ")
180         uuid := stringSlice[0]
181         xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
182
183         if _, ok := rtmgr.Eps[uuid]; ok {
184                 ep := rtmgr.Eps[uuid]
185                 return ep
186         }
187
188         /* incase the stored Endpoint list is in the form of IP:port*/
189         stringsubsplit := strings.Split(uuid, ":")
190         addr, err := net.LookupIP(stringsubsplit[0])
191         if err == nil {
192                 convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1])
193                 xapp.Logger.Info(" IP:Port received is %s", convertedUuid)
194                 if _, ok := rtmgr.Eps[convertedUuid]; ok {
195                         ep := rtmgr.Eps[convertedUuid]
196                         return ep
197                 }
198         }
199
200         return nil
201 }