X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fsbi%2Fnngpush.go;fp=pkg%2Fsbi%2Fnngpush.go;h=b270abfb9cedf0326deb25d2439f8892a3df761b;hb=45b86cc64dc6071f160b0f2c9ab8dfb57d1ce039;hp=44512999cba04b37142b1ae9284272926d5289c9;hpb=41e32c6cd23e3ac33e4b004b0fde57e371d02c81;p=ric-plt%2Frtmgr.git diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go index 4451299..b270abf 100644 --- a/pkg/sbi/nngpush.go +++ b/pkg/sbi/nngpush.go @@ -43,8 +43,6 @@ package sbi import "C" import ( - "bytes" - "crypto/md5" "errors" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "nanomsg.org/go/mangos/v2" @@ -53,7 +51,6 @@ import ( "routing-manager/pkg/rtmgr" "strconv" "time" - "fmt" ) type NngPush struct { @@ -62,18 +59,6 @@ type NngPush struct { rcChan chan *xapp.RMRParams } -type RMRParams struct { - *xapp.RMRParams -} - - -func (params *RMRParams) String() string { - var b bytes.Buffer - sum := md5.Sum(params.Payload) - fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum) - return b.String() -} - func NewNngPush() *NngPush { instance := new(NngPush) instance.NewSocket = createNewPushSocket @@ -117,25 +102,28 @@ func (c *NngPush) Terminate() error { } func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error { - + var err error + var socket NngSocket xapp.Logger.Debug("Invoked sbi.AddEndpoint") xapp.Logger.Debug("args: %v", *ep) - endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber) - ep.Whid = int(xapp.Rmr.Openwh(endpoint)) - if ep.Whid < 0 { - return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid)) - }else { - xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint) + socket, err = c.NewSocket() + if err != nil { + return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error()) + } + ep.Socket = socket + err = c.dial(ep) + if err != nil { + return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error()) } - return nil } func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error { xapp.Logger.Debug("Invoked sbi. DeleteEndpoint") xapp.Logger.Debug("args: %v", *ep) - - xapp.Rmr.Closewh(ep.Whid) + if err := ep.Socket.(NngSocket).Close(); err != nil { + return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error()) + } return nil } @@ -160,40 +148,28 @@ func (c *NngPush) dial(ep *rtmgr.Endpoint) error { func (c *NngPush) DistributeAll(policies *[]string) error { xapp.Logger.Debug("Invoked: sbi.DistributeAll") xapp.Logger.Debug("args: %v", *policies) - for _, ep := range rtmgr.Eps { - go c.send(ep, policies) + i := 1 + for i < 5 { + if ep.IsReady { + go c.send(ep, policies) + break + } else { + xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i)) + time.Sleep(10 * time.Millisecond) + i++ + } + } } - return nil } func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) { xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid) - for _, pe := range *policies { - params := &RMRParams{&xapp.RMRParams{}} - params.Mtype = 20 - params.PayloadLen = len([]byte(pe)) - params.Payload =[]byte(pe) - params.Mbuf = nil - params.Whid = ep.Whid - time.Sleep(1 * time.Millisecond) - xapp.Rmr.SendMsg(params.RMRParams) + if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil { + xapp.Logger.Error("Unable to send policy entry due to: " + err.Error()) + } } xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")") } - -func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){ - return c.createEndpoint(payload, c) -} - -func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error { - xapp.Logger.Debug("Invoked: sbi.DistributeToEp") - xapp.Logger.Debug("args: %v", *policies) - - go c.send(ep, policies) - - return nil -} -