WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
+
+ This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ platform project (RICP).
+
==================================================================================
*/
/*
import (
"errors"
- "fmt"
- "rtmgr"
- "strconv"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "routing-manager/pkg/rtmgr"
+ "strconv"
)
-const DEFAULT_NNG_PUBSUB_SOCKET_PREFIX = "tcp://"
-const DEFAULT_NNG_PUBSUB_SOCKET_NUMBER = 4560
-const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://"
-const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561
+const DefaultNngPipelineSocketPrefix = "tcp://"
+const DefaultNngPipelineSocketNumber = 4561
+const PlatformType = "platform"
var (
- SupportedSbis = []*SbiEngineConfig{
- &SbiEngineConfig{
- SbiEngine{
- Name: "nngpub",
- Version: "v1",
- Protocol: "nngpubsub",
- },
- openSocket(openNngPub),
- closeSocket(closeNngPub),
- createEndpointSocket(createNngPubEndpointSocket),
- destroyEndpointSocket(createNngPubEndpointSocket),
- distributeAll(publishAll),
- true,
- },
- &SbiEngineConfig{
- SbiEngine{
- Name: "nngpush",
- Version: "v1",
- Protocol: "nngpipeline",
- },
- openSocket(openNngPush),
- closeSocket(closeNngPush),
- createEndpointSocket(createNngPushEndpointSocket),
- destroyEndpointSocket(destroyNngPushEndpointSocket),
- distributeAll(pushAll),
- true,
+ SupportedSbis = []*EngineConfig{
+ {
+ Name: "nngpush",
+ Version: "v1",
+ Protocol: "nngpipeline",
+ Instance: NewNngPush(),
+ IsAvailable: true,
},
}
)
-func ListSbis() {
- fmt.Printf("SBI:\n")
+func GetSbi(sbiName string) (Engine, error) {
for _, sbi := range SupportedSbis {
- if sbi.IsAvailable {
- rtmgr.Logger.Info(sbi.Engine.Name + "/" + sbi.Engine.Version)
+ if sbi.Name == sbiName && sbi.IsAvailable {
+ return sbi.Instance, nil
}
}
+ return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
}
-func GetSbi(sbiName string) (*SbiEngineConfig, error) {
- for _, sbi := range SupportedSbis {
- if (*sbi).Engine.Name == sbiName && (*sbi).IsAvailable {
- return sbi, nil
+type Sbi struct {
+}
+
+func (s *Sbi) pruneEndpointList(sbi Engine) {
+ xapp.Logger.Debug("pruneEndpointList invoked.")
+ for _, ep := range rtmgr.Eps {
+ if !ep.Keepalive {
+ xapp.Logger.Debug("deleting %v", ep)
+ sbi.DeleteEndpoint(ep)
+ delete(rtmgr.Eps, ep.Uuid)
+ } else {
+ rtmgr.Eps[ep.Uuid].Keepalive = false
}
}
- return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
}
-func pruneEndpointList(sbii *SbiEngineConfig) {
- for _, ep := range rtmgr.Eps {
- if !ep.Keepalive {
- sbii.DestroyEndpointSocket(ep)
- delete(rtmgr.Eps, ep.Uuid)
- } else {
- rtmgr.Eps[ep.Uuid].Keepalive = false
- }
- }
+func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbi Engine) {
+ for _, xapps := range (*rcs).XApps {
+ for _, instance := range xapps.Instances {
+ uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
+ if _, ok := rtmgr.Eps[uuid]; ok {
+ rtmgr.Eps[uuid].Keepalive = true
+ } else {
+ ep := &rtmgr.Endpoint{
+ Uuid: uuid,
+ Name: instance.Name,
+ XAppType: xapps.Name,
+ Ip: instance.Ip,
+ Port: instance.Port,
+ TxMessages: instance.TxMessages,
+ RxMessages: instance.RxMessages,
+ Policies: instance.Policies,
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
+ }
+ if err := sbi.AddEndpoint(ep); err != nil {
+ xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ continue
+ }
+ rtmgr.Eps[uuid] = ep
+ }
+ }
+ }
+ s.updatePlatformEndpoints(&((*rcs).Pcs), sbi)
+ s.pruneEndpointList(sbi)
}
-func UpdateEndpointList(xapps *[]rtmgr.XApp, sbii *SbiEngineConfig) {
- for _, xapp := range *xapps {
- for _, instance := range xapp.Instances {
- uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
- if _, ok := rtmgr.Eps[uuid]; ok {
- rtmgr.Eps[uuid].Keepalive = true
- } else {
- ep := &rtmgr.Endpoint{
- uuid,
- instance.Name,
- xapp.Name,
- instance.Ip,
- instance.Port,
- instance.TxMessages,
- instance.RxMessages,
- nil,
- false,
- true,
- }
- if err := sbii.CreateEndpointSocket(ep); err != nil {
- rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
- continue
- }
- rtmgr.Eps[uuid] = ep
- }
- }
- }
- pruneEndpointList(sbii)
+func (s *Sbi) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbi Engine) {
+ xapp.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs)
+ for _, pc := range *pcs {
+ uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port))
+ if _, ok := rtmgr.Eps[uuid]; ok {
+ rtmgr.Eps[uuid].Keepalive = true
+ } else {
+ ep := &rtmgr.Endpoint{
+ Uuid: uuid,
+ Name: pc.Name,
+ XAppType: PlatformType,
+ Ip: pc.Fqdn,
+ Port: pc.Port,
+ TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
+ RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
+ }
+ xapp.Logger.Debug("ep created: %v", ep)
+ if err := sbi.AddEndpoint(ep); err != nil {
+ xapp.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ continue
+ }
+ rtmgr.Eps[uuid] = ep
+ }
+ }
}