import "C"
import (
- "bytes"
- "crypto/md5"
"errors"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"nanomsg.org/go/mangos/v2"
"routing-manager/pkg/rtmgr"
"strconv"
"time"
- "fmt"
)
type NngPush struct {
rcChan chan *xapp.RMRParams
}
-type RMRParams struct {
- *xapp.RMRParams
-}
-
-
-func (params *RMRParams) String() string {
- var b bytes.Buffer
- sum := md5.Sum(params.Payload)
- fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
- return b.String()
-}
-
func NewNngPush() *NngPush {
instance := new(NngPush)
instance.NewSocket = createNewPushSocket
}
func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-
+ var err error
+ var socket NngSocket
xapp.Logger.Debug("Invoked sbi.AddEndpoint")
xapp.Logger.Debug("args: %v", *ep)
- endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
- ep.Whid = int(xapp.Rmr.Openwh(endpoint))
- if ep.Whid < 0 {
- return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
- }else {
- xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
+ socket, err = c.NewSocket()
+ if err != nil {
+ return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
+ ep.Socket = socket
+ err = c.dial(ep)
+ if err != nil {
+ return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
}
-
return nil
}
func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
xapp.Logger.Debug("args: %v", *ep)
-
- xapp.Rmr.Closewh(ep.Whid)
+ if err := ep.Socket.(NngSocket).Close(); err != nil {
+ return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
+ }
return nil
}
func (c *NngPush) DistributeAll(policies *[]string) error {
xapp.Logger.Debug("Invoked: sbi.DistributeAll")
xapp.Logger.Debug("args: %v", *policies)
-
for _, ep := range rtmgr.Eps {
- go c.send(ep, policies)
+ i := 1
+ for i < 5 {
+ if ep.IsReady {
+ go c.send(ep, policies)
+ break
+ } else {
+ xapp.Logger.Warn("Endpoint " + ep.Uuid + " is not ready" + " Retry count " + strconv.Itoa(i))
+ time.Sleep(10 * time.Millisecond)
+ i++
+ }
+ }
}
-
return nil
}
func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
-
for _, pe := range *policies {
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 20
- params.PayloadLen = len([]byte(pe))
- params.Payload =[]byte(pe)
- params.Mbuf = nil
- params.Whid = ep.Whid
- time.Sleep(1 * time.Millisecond)
- xapp.Rmr.SendMsg(params.RMRParams)
+ if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
+ xapp.Logger.Error("Unable to send policy entry due to: " + err.Error())
+ }
}
xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
}
-
-func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
- return c.createEndpoint(payload, c)
-}
-
-func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
- xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
- xapp.Logger.Debug("args: %v", *policies)
-
- go c.send(ep, policies)
-
- return nil
-}
-