import (
"errors"
- "fmt"
- "rtmgr"
"strconv"
+ "routing-manager/pkg/rtmgr"
)
const DEFAULT_NNG_PUBSUB_SOCKET_PREFIX = "tcp://"
const DEFAULT_NNG_PUBSUB_SOCKET_NUMBER = 4560
const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://"
const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561
+const PLATFORMTYPE = "platformcomponenttype"
var (
SupportedSbis = []*SbiEngineConfig{
&SbiEngineConfig{
- SbiEngine{
- Name: "nngpub",
- Version: "v1",
- Protocol: "nngpubsub",
- },
- openSocket(openNngPub),
- closeSocket(closeNngPub),
- createEndpointSocket(createNngPubEndpointSocket),
- destroyEndpointSocket(createNngPubEndpointSocket),
- distributeAll(publishAll),
- true,
- },
+ Name: "nngpush",
+ Version: "v1",
+ Protocol: "nngpipeline",
+ Instance: NewNngPush(),
+ IsAvailable: true,
+ },
&SbiEngineConfig{
- SbiEngine{
- Name: "nngpush",
- Version: "v1",
- Protocol: "nngpipeline",
- },
- openSocket(openNngPush),
- closeSocket(closeNngPush),
- createEndpointSocket(createNngPushEndpointSocket),
- destroyEndpointSocket(destroyNngPushEndpointSocket),
- distributeAll(pushAll),
- true,
- },
+ Name: "nngpub",
+ Version: "v1",
+ Protocol: "nngpubsub",
+ Instance: NewNngPub(),
+ IsAvailable: true,
+ },
}
)
-func ListSbis() {
- fmt.Printf("SBI:\n")
+func GetSbi(sbiName string) (SbiEngine, error) {
for _, sbi := range SupportedSbis {
- if sbi.IsAvailable {
- rtmgr.Logger.Info(sbi.Engine.Name + "/" + sbi.Engine.Version)
+ if sbi.Name == sbiName && sbi.IsAvailable {
+ return sbi.Instance, nil
}
}
+ return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
}
-func GetSbi(sbiName string) (*SbiEngineConfig, error) {
- for _, sbi := range SupportedSbis {
- if (*sbi).Engine.Name == sbiName && (*sbi).IsAvailable {
- return sbi, nil
- }
- }
- return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
+type Sbi struct {
+
}
-func pruneEndpointList(sbii *SbiEngineConfig) {
+func (s *Sbi) pruneEndpointList(sbii SbiEngine) {
for _, ep := range rtmgr.Eps {
if !ep.Keepalive {
- sbii.DestroyEndpointSocket(ep)
+ rtmgr.Logger.Debug("deleting %v",ep)
+ sbii.DeleteEndpoint(ep)
delete(rtmgr.Eps, ep.Uuid)
} else {
rtmgr.Eps[ep.Uuid].Keepalive = false
}
}
-func UpdateEndpointList(xapps *[]rtmgr.XApp, sbii *SbiEngineConfig) {
- for _, xapp := range *xapps {
+func (s *Sbi) updateEndpoints(rcs *rtmgr.RicComponents, sbii SbiEngine) {
+ for _, xapp := range (*rcs).Xapps {
for _, instance := range xapp.Instances {
uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
if _, ok := rtmgr.Eps[uuid]; ok {
false,
true,
}
- if err := sbii.CreateEndpointSocket(ep); err != nil {
+ if err := sbii.AddEndpoint(ep); err != nil {
rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
continue
}
}
}
}
- pruneEndpointList(sbii)
+ s.updatePlatformEndpoints(&((*rcs).Pcs), sbii)
+ s.pruneEndpointList(sbii)
+}
+
+func (s *Sbi ) updatePlatformEndpoints(pcs *rtmgr.PlatformComponents, sbii SbiEngine) {
+ rtmgr.Logger.Debug("updatePlatformEndpoints invoked. PCS: %v", *pcs)
+ for _, pc := range *pcs {
+ uuid := pc.Fqdn + ":" + strconv.Itoa(int(pc.Port))
+ if _, ok := rtmgr.Eps[uuid]; ok {
+ rtmgr.Eps[uuid].Keepalive = true
+ } else {
+ ep := &rtmgr.Endpoint{
+ uuid,
+ pc.Name,
+ PLATFORMTYPE,
+ pc.Fqdn,
+ pc.Port,
+ rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
+ rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
+ nil,
+ false,
+ true,
+ }
+ rtmgr.Logger.Debug("ep created: %v",ep)
+ if err := sbii.AddEndpoint(ep); err != nil {
+ rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ continue
+ }
+ rtmgr.Eps[uuid] = ep
+ }
+ }
}
+