X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fsbi%2Fsbi.go;h=21ff78eb1d0382083d8258a74de50db1496d7fe4;hb=6a9ce4976c1fbbcb8d4a068f81e76515c99a9d3f;hp=58fe7d8e64b94046071652f78ca2ec8033948c8e;hpb=dd6b05676beaff9d6252c3486cf8fb77748d37f8;p=ric-plt%2Frtmgr.git diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 58fe7d8..21ff78e 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -31,23 +31,25 @@ package sbi import ( "errors" + //"fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + //"net" "routing-manager/pkg/rtmgr" "strconv" "strings" ) -const DefaultNngPipelineSocketPrefix = "tcp://" -const DefaultNngPipelineSocketNumber = 4561 +const DefaultRmrPipelineSocketPrefix = "tcp://" +const DefaultRmrPipelineSocketNumber = 4561 const PlatformType = "platform" var ( SupportedSbis = []*EngineConfig{ { - Name: "nngpush", + Name: "rmrpush", Version: "v1", - Protocol: "nngpipeline", - Instance: NewNngPush(), + Protocol: "rmrpipeline", + Instance: NewRmrPush(), IsAvailable: true, }, } @@ -73,7 +75,9 @@ func (s *Sbi) pruneEndpointList(sbi Engine) { sbi.DeleteEndpoint(ep) delete(rtmgr.Eps, ep.Uuid) } else { - rtmgr.Eps[ep.Uuid].Keepalive = false + if rtmgr.Eps[ep.Uuid] != nil { + rtmgr.Eps[ep.Uuid].Keepalive = false + } } } } @@ -107,7 +111,7 @@ func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) { } } s.updatePlatformEndpoints(&((*rcs).Pcs), sbi) - s.updateE2TEndpoints(&((*rcs).E2Ts), sbi) + s.updateE2TEndpoints(&((*rcs).E2Ts), sbi) s.pruneEndpointList(sbi) } @@ -124,8 +128,8 @@ func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) XAppType: PlatformType, Ip: pc.Fqdn, Port: pc.Port, - TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"], - RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"], + //TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"], + //RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"], Socket: nil, IsReady: false, Keepalive: true, @@ -141,48 +145,71 @@ func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) } func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) { - xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts) - for _, e2t := range *E2Ts { - uuid := e2t.Fqdn - stringSlice := strings.Split(e2t.Fqdn, ":") - ipaddress := stringSlice[0] - port, _ := strconv.Atoi(stringSlice[1]) - if _, ok := rtmgr.Eps[uuid]; ok { - rtmgr.Eps[uuid].Keepalive = true - } else { - ep := &rtmgr.Endpoint{ - Uuid: uuid, - Name: e2t.Name, - XAppType: PlatformType, - Ip: ipaddress, - Port: uint16(port), - TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"], - RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"], - Socket: nil, - IsReady: false, - Keepalive: true, - } - xapp.Logger.Debug("ep created: %v", ep) - if err := sbi.AddEndpoint(ep); err != nil { - xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) - continue - } - rtmgr.Eps[uuid] = ep - } - } + xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts) + for _, e2t := range *E2Ts { + uuid := e2t.Fqdn + stringSlice := strings.Split(e2t.Fqdn, ":") + ipaddress := stringSlice[0] + port, _ := strconv.Atoi(stringSlice[1]) + if _, ok := rtmgr.Eps[uuid]; ok { + rtmgr.Eps[uuid].Keepalive = true + } else { + ep := &rtmgr.Endpoint{ + Uuid: uuid, + Name: e2t.Name, + XAppType: PlatformType, + Ip: ipaddress, + Port: uint16(port), + //TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"], + //RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"], + Socket: nil, + IsReady: false, + Keepalive: true, + } + xapp.Logger.Debug("ep created: %v", ep) + if err := sbi.AddEndpoint(ep); err != nil { + xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) + continue + } + rtmgr.Eps[uuid] = ep + } + } } -func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) { +func (s *Sbi) createEndpoint(payload string,rmrsrc string, sbi Engine) (*string,int) { xapp.Logger.Debug("CreateEndPoint %v", payload) - stringSlice := strings.Split(payload, " ") - uuid := stringSlice[0] - xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) - +// stringSlice := strings.Split(payload, " ") +// uuid := stringSlice[0] +// xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) - if _, ok := rtmgr.Eps[uuid]; ok { +/* if _, ok := rtmgr.Eps[uuid]; ok { ep := rtmgr.Eps[uuid] return ep + }*/ + + /* incase the stored Endpoint list is in the form of IP:port*/ +/* stringsubsplit := strings.Split(uuid, ":") + addr, err := net.LookupIP(stringsubsplit[0]) + if err == nil { + convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1]) + xapp.Logger.Info(" IP:Port received is %s", convertedUuid) + if _, ok := rtmgr.Eps[convertedUuid]; ok { + ep := rtmgr.Eps[convertedUuid] + return ep + } + }*/ + + /* Create a new mapping, this case is assumed for multiple process sending RMR request from a container */ + srcString := strings.Split(rmrsrc," ") + srcStringSlice := strings.Split(srcString[0],"=") + Whid := int(xapp.Rmr.Openwh(srcStringSlice[1])) + + xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s",Whid,srcStringSlice[1]) + if Whid > 0 { +// rtmgr.RmrEp[srcStringSlice[1]] = Whid + xapp.Logger.Info("received %s and mapped to Whid = %d",srcStringSlice[1],Whid) + return &srcStringSlice[1],Whid } - return nil -} + return nil,Whid + }