import (
"errors"
+ "fmt"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "net"
"routing-manager/pkg/rtmgr"
"strconv"
+ "strings"
)
-const DefaultNngPipelineSocketPrefix = "tcp://"
-const DefaultNngPipelineSocketNumber = 4561
+const DefaultRmrPipelineSocketPrefix = "tcp://"
+const DefaultRmrPipelineSocketNumber = 4561
const PlatformType = "platform"
var (
SupportedSbis = []*EngineConfig{
{
- Name: "nngpush",
+ Name: "rmrpush",
Version: "v1",
- Protocol: "nngpipeline",
- Instance: NewNngPush(),
+ Protocol: "rmrpipeline",
+ Instance: NewRmrPush(),
IsAvailable: true,
},
}
sbi.DeleteEndpoint(ep)
delete(rtmgr.Eps, ep.Uuid)
} else {
- rtmgr.Eps[ep.Uuid].Keepalive = false
+ if rtmgr.Eps[ep.Uuid] != nil {
+ rtmgr.Eps[ep.Uuid].Keepalive = false
+ }
}
}
}
Keepalive: true,
}
if err := sbi.AddEndpoint(ep); err != nil {
- xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ xapp.Logger.Error("Cannot create socket for endpoint: " + ep.Name + " due to:" + err.Error())
continue
}
rtmgr.Eps[uuid] = ep
}
}
s.updatePlatformEndpoints(&((*rcs).Pcs), sbi)
+ s.updateE2TEndpoints(&((*rcs).E2Ts), sbi)
s.pruneEndpointList(sbi)
}
rtmgr.Eps[uuid].Keepalive = true
} else {
ep := &rtmgr.Endpoint{
- Uuid: uuid,
- Name: pc.Name,
- XAppType: PlatformType,
- Ip: pc.Fqdn,
- Port: pc.Port,
- TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
- RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
- Socket: nil,
- IsReady: false,
- Keepalive: true,
+ Uuid: uuid,
+ Name: pc.Name,
+ XAppType: PlatformType,
+ Ip: pc.Fqdn,
+ Port: pc.Port,
+ //TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
+ //RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
+ }
+ xapp.Logger.Debug("ep created: %v", ep)
+ if err := sbi.AddEndpoint(ep); err != nil {
+ xapp.Logger.Error("Cannot create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ continue
+ }
+ rtmgr.Eps[uuid] = ep
+ }
+ }
+}
+
+func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) {
+ xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts)
+ for _, e2t := range *E2Ts {
+ uuid := e2t.Fqdn
+ stringSlice := strings.Split(e2t.Fqdn, ":")
+ ipaddress := stringSlice[0]
+ port, _ := strconv.Atoi(stringSlice[1])
+ if _, ok := rtmgr.Eps[uuid]; ok {
+ rtmgr.Eps[uuid].Keepalive = true
+ } else {
+ ep := &rtmgr.Endpoint{
+ Uuid: uuid,
+ Name: e2t.Name,
+ XAppType: PlatformType,
+ Ip: ipaddress,
+ Port: uint16(port),
+ //TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
+ //RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
}
xapp.Logger.Debug("ep created: %v", ep)
if err := sbi.AddEndpoint(ep); err != nil {
- xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ xapp.Logger.Error("Cannot create socket for endpoint: " + ep.Name + " due to:" + err.Error())
continue
}
rtmgr.Eps[uuid] = ep
}
}
}
+
+func (s *Sbi) checkEndpoint(payload string) *rtmgr.Endpoint {
+ /* Payload contains endpoint in the form of IP<domain name>:Port.
+ Port is data port of sender endpoint.
+ Eps contains the UUID in the form of IP<domain name>:Port.
+ Port is the Application Port(http) */
+
+ xapp.Logger.Debug("Invoked checkEndPoint %v", payload)
+ stringSlice := strings.Split(payload, " ")
+ uuid := stringSlice[0]
+ stringsubsplit := strings.Split(uuid, ":")
+ xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
+ for _, ep := range rtmgr.Eps {
+ if strings.Contains(ep.Uuid, stringsubsplit[0]) == true {
+ endpoint := rtmgr.Eps[ep.Uuid]
+ return endpoint
+ }
+ }
+
+ /* incase the stored Endpoint list is in the form of IP:port*/
+ addr, err := net.LookupIP(stringsubsplit[0])
+ if err == nil {
+ convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1])
+ xapp.Logger.Info(" IP:Port received is %s", convertedUuid)
+ IP := fmt.Sprintf("%s", addr[0])
+ for _, ep := range rtmgr.Eps {
+ res := strings.Contains(ep.Uuid, IP)
+ if res == true {
+ endpoint := rtmgr.Eps[ep.Uuid]
+ return endpoint
+ }
+ }
+ }
+ return nil
+}
+
+func (s *Sbi) createEndpoint(rmrsrc string) (*string, int) {
+ /* Create a new mapping, this case is assumed for multiple process sending RMR request from a container */
+ srcString := strings.Split(rmrsrc, " ")
+ srcStringSlice := strings.Split(srcString[0], "=")
+ Whid := int(xapp.Rmr.Openwh(srcStringSlice[1]))
+
+ xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s", Whid, srcStringSlice[1])
+ return &srcStringSlice[1], Whid
+}