X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fsbi%2Fsbi.go;h=58fe7d8e64b94046071652f78ca2ec8033948c8e;hb=41e32c6cd23e3ac33e4b004b0fde57e371d02c81;hp=83b3790c7c159dba20c7ccf5303d22183cfc2354;hpb=16d84d6f7d3489e65e0a83ba9c0d5d62c3914c7f;p=ric-plt%2Frtmgr.git diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go index 83b3790..58fe7d8 100644 --- a/pkg/sbi/sbi.go +++ b/pkg/sbi/sbi.go @@ -15,6 +15,10 @@ w WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + + This source code is part of the near-RT RIC (RAN Intelligent Controller) + platform project (RICP). + ================================================================================== */ /* @@ -27,102 +31,158 @@ package sbi import ( "errors" - "fmt" - "rtmgr" - "strconv" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "routing-manager/pkg/rtmgr" + "strconv" + "strings" ) -const DEFAULT_NNG_PUBSUB_SOCKET_PREFIX = "tcp://" -const DEFAULT_NNG_PUBSUB_SOCKET_NUMBER = 4560 -const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://" -const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561 +const DefaultNngPipelineSocketPrefix = "tcp://" +const DefaultNngPipelineSocketNumber = 4561 +const PlatformType = "platform" var ( - SupportedSbis = []*SbiEngineConfig{ - &SbiEngineConfig{ - SbiEngine{ - Name: "nngpub", - Version: "v1", - Protocol: "nngpubsub", - }, - openSocket(openNngPub), - closeSocket(closeNngPub), - createEndpointSocket(createNngPubEndpointSocket), - destroyEndpointSocket(createNngPubEndpointSocket), - distributeAll(publishAll), - true, - }, - &SbiEngineConfig{ - SbiEngine{ - Name: "nngpush", - Version: "v1", - Protocol: "nngpipeline", - }, - openSocket(openNngPush), - closeSocket(closeNngPush), - createEndpointSocket(createNngPushEndpointSocket), - destroyEndpointSocket(destroyNngPushEndpointSocket), - distributeAll(pushAll), - true, + SupportedSbis = []*EngineConfig{ + { + Name: "nngpush", + Version: "v1", + Protocol: "nngpipeline", + Instance: NewNngPush(), + IsAvailable: true, }, } ) -func ListSbis() { - fmt.Printf("SBI:\n") +func GetSbi(sbiName string) (Engine, error) { for _, sbi := range SupportedSbis { - if sbi.IsAvailable { - rtmgr.Logger.Info(sbi.Engine.Name + "/" + sbi.Engine.Version) + if sbi.Name == sbiName && sbi.IsAvailable { + return sbi.Instance, nil } } + return nil, errors.New("SBI:" + sbiName + " is not supported or still not available") } -func GetSbi(sbiName string) (*SbiEngineConfig, error) { - for _, sbi := range SupportedSbis { - if (*sbi).Engine.Name == sbiName && (*sbi).IsAvailable { - return sbi, nil +type Sbi struct { +} + +func (s *Sbi) pruneEndpointList(sbi Engine) { + xapp.Logger.Debug("pruneEndpointList invoked.") + for _, ep := range rtmgr.Eps { + if !ep.Keepalive { + xapp.Logger.Debug("deleting %v", ep) + sbi.DeleteEndpoint(ep) + delete(rtmgr.Eps, ep.Uuid) + } else { + rtmgr.Eps[ep.Uuid].Keepalive = false } } - return nil, errors.New("SBI:" + sbiName + " is not supported or still not available") } -func pruneEndpointList(sbii *SbiEngineConfig) { - for _, ep := range rtmgr.Eps { - if !ep.Keepalive { - sbii.DestroyEndpointSocket(ep) - delete(rtmgr.Eps, ep.Uuid) - } else { - rtmgr.Eps[ep.Uuid].Keepalive = false - } - } +func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) { + for _, xapps := range (*rcs).XApps { + for _, instance := range xapps.Instances { + uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port)) + if _, ok := rtmgr.Eps[uuid]; ok { + rtmgr.Eps[uuid].Keepalive = true + } else { + ep := &rtmgr.Endpoint{ + Uuid: uuid, + Name: instance.Name, + XAppType: xapps.Name, + Ip: instance.Ip, + Port: instance.Port, + TxMessages: instance.TxMessages, + RxMessages: instance.RxMessages, + Policies: instance.Policies, + Socket: nil, + IsReady: false, + Keepalive: true, + } + if err := sbi.AddEndpoint(ep); err != nil { + xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) + continue + } + rtmgr.Eps[uuid] = ep + } + } + } + s.updatePlatformEndpoints(&((*rcs).Pcs), sbi) + s.updateE2TEndpoints(&((*rcs).E2Ts), sbi) + s.pruneEndpointList(sbi) } -func UpdateEndpointList(xapps *[]rtmgr.XApp, sbii *SbiEngineConfig) { - for _, xapp := range *xapps { - for _, instance := range xapp.Instances { - uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port)) - if _, ok := rtmgr.Eps[uuid]; ok { - rtmgr.Eps[uuid].Keepalive = true - } else { - ep := &rtmgr.Endpoint{ - uuid, - instance.Name, - xapp.Name, - instance.Ip, - instance.Port, - instance.TxMessages, - instance.RxMessages, - nil, - false, - true, - } - if err := sbii.CreateEndpointSocket(ep); err != nil { - rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) - continue - } - rtmgr.Eps[uuid] = ep +func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) { + xapp.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs) + for _, pc := range *pcs { + uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port)) + if _, ok := rtmgr.Eps[uuid]; ok { + rtmgr.Eps[uuid].Keepalive = true + } else { + ep := &rtmgr.Endpoint{ + Uuid: uuid, + Name: pc.Name, + XAppType: PlatformType, + Ip: pc.Fqdn, + Port: pc.Port, + TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"], + RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"], + Socket: nil, + IsReady: false, + Keepalive: true, + } + xapp.Logger.Debug("ep created: %v", ep) + if err := sbi.AddEndpoint(ep); err != nil { + xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) + continue + } + rtmgr.Eps[uuid] = ep + } + } +} + +func (s *Sbi) updateE2TEndpoints(E2Ts *map[string]rtmgr.E2TInstance, sbi Engine) { + xapp.Logger.Debug("updateE2TEndpoints invoked. E2T: %v", *E2Ts) + for _, e2t := range *E2Ts { + uuid := e2t.Fqdn + stringSlice := strings.Split(e2t.Fqdn, ":") + ipaddress := stringSlice[0] + port, _ := strconv.Atoi(stringSlice[1]) + if _, ok := rtmgr.Eps[uuid]; ok { + rtmgr.Eps[uuid].Keepalive = true + } else { + ep := &rtmgr.Endpoint{ + Uuid: uuid, + Name: e2t.Name, + XAppType: PlatformType, + Ip: ipaddress, + Port: uint16(port), + TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"], + RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"], + Socket: nil, + IsReady: false, + Keepalive: true, + } + xapp.Logger.Debug("ep created: %v", ep) + if err := sbi.AddEndpoint(ep); err != nil { + xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error()) + continue } + rtmgr.Eps[uuid] = ep } } - pruneEndpointList(sbii) +} + +func (s *Sbi) createEndpoint(payload string, sbi Engine) (*rtmgr.Endpoint) { + xapp.Logger.Debug("CreateEndPoint %v", payload) + stringSlice := strings.Split(payload, " ") + uuid := stringSlice[0] + xapp.Logger.Debug(">>> uuid %v", stringSlice[0]) + + + if _, ok := rtmgr.Eps[uuid]; ok { + ep := rtmgr.Eps[uuid] + return ep + } + + return nil }