Add route for e2nodeConfigUpdate
[ric-plt/rtmgr.git] / pkg / sbi / nngpush.go
index bb39be4..0d9634e 100644 (file)
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+
 ==================================================================================
 */
 /*
 ==================================================================================
 */
 /*
-  Mnemonic:    nngpipe.go
-  Abstract: mangos (NNG) Pipeline SBI implementation
+  Mnemonic:    rmrpipe.go
+  Abstract: mangos (RMR) Pipeline SBI implementation
   Date:                12 March 2019
 */
 
 package sbi
 
   Date:                12 March 2019
 */
 
 package sbi
 
+/*
+#include <rmr/rmr.h>
+*/
+import "C"
+
 import (
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "errors"
-       "nanomsg.org/go/mangos/v2"
-       "nanomsg.org/go/mangos/v2/protocol/push"
-       _ "nanomsg.org/go/mangos/v2/transport/all"
+       "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "routing-manager/pkg/rtmgr"
        "strconv"
        "routing-manager/pkg/rtmgr"
        "strconv"
+       "strings"
+       "sync"
+       "time"
 )
 
 )
 
-type NngPush struct {
+var rmrcallid = 1
+var rmrdynamiccallid = 201
+var addendpointct = 1
+var count int
+var Conn sync.Mutex
+
+type RmrPush struct {
        Sbi
        Sbi
-       NewSocket CreateNewNngSocketHandler
+       rcChan chan *xapp.RMRParams
 }
 
 }
 
-func NewNngPush() *NngPush {
-       instance := new(NngPush)
-       instance.NewSocket = createNewPushSocket
-       return instance
+type EPStatus struct {
+       endpoint string
+       status   bool
 }
 
 }
 
-func createNewPushSocket() (NngSocket, error) {
-       rtmgr.Logger.Debug("Invoked: createNewPushSocket()")
-       socket, err := push.NewSocket()
-       if err != nil {
-               return nil, errors.New("can't create new push socket due to:" + err.Error())
-       }
-       socket.SetPipeEventHook(pipeEventHandler)
-       return socket, nil
+type RMRParams struct {
+       *xapp.RMRParams
 }
 
 }
 
-func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
-       rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
-       for _, ep := range rtmgr.Eps {
-               uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
-               if uri == pipe.Address() {
-                       switch event {
-                       case 1:
-                               ep.IsReady = true
-                               rtmgr.Logger.Debug("Endpoint " + uri + " successfully attached")
-                       default:
-                               ep.IsReady = false
-                               rtmgr.Logger.Debug("Endpoint " + uri + " has been detached")
-                       }
-               }
-       }
+func (params *RMRParams) String() string {
+       var b bytes.Buffer
+       sum := md5.Sum(params.Payload)
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
+       return b.String()
+}
+
+func NewRmrPush() *RmrPush {
+       instance := new(RmrPush)
+       return instance
 }
 
 }
 
-func (c *NngPush) Initialize(ip string) error {
+func (c *RmrPush) Initialize(ip string) error {
        return nil
 }
 
        return nil
 }
 
-func (c *NngPush) Terminate() error {
+func (c *RmrPush) Terminate() error {
        return nil
 }
 
        return nil
 }
 
-func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-       var err error
-       var socket NngSocket
-       rtmgr.Logger.Debug("Invoked sbi.AddEndpoint")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       socket, err = c.NewSocket()
-       if err != nil {
-               return errors.New("can't add new socket to endpoint:" + ep.Uuid + " due to: " + err.Error())
-       }
-       ep.Socket = socket
-       err = c.dial(ep)
-       if err != nil {
-               return errors.New("can't dial to endpoint:" + ep.Uuid + " due to: " + err.Error())
+func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
+       addendpointct = addendpointct + 1
+       count := addendpointct
+       xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
+       endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
+       ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+       if ep.Whid < 0 {
+               time.Sleep(time.Duration(10) * time.Second)
+               ep.Whid = int(xapp.Rmr.Openwh(endpoint))
+               if ep.Whid < 0 {
+                       return errors.New("cannot open wormhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid) + " count: " + strconv.Itoa(count))
+               }
+       } else {
+               xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
        }
        }
+
        return nil
 }
 
        return nil
 }
 
-func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Invoked sbi. DeleteEndpoint")
-       rtmgr.Logger.Debug("args: %v", (*ep))
-       if err := ep.Socket.(NngSocket).Close(); err != nil {
-               return errors.New("can't close push socket of endpoint:" + ep.Uuid + " due to: " + err.Error())
-       }
+func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
+       xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
+       xapp.Logger.Debug("args: %v", *ep)
+
+       xapp.Rmr.Closewh(ep.Whid)
        return nil
 }
 
        return nil
 }
 
-func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
+func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
        c.updateEndpoints(rcs, c)
 }
 
        c.updateEndpoints(rcs, c)
 }
 
-/*
-NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
-*/
-func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
-       rtmgr.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
-       uri := DEFAULT_NNG_PIPELINE_SOCKET_PREFIX + ep.Ip + ":" + strconv.Itoa(DEFAULT_NNG_PIPELINE_SOCKET_NUMBER)
-       options := make(map[string]interface{})
-       options[mangos.OptionDialAsynch] = true
-       if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
-               return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
+func (c *RmrPush) DistributeAll(policies *[]string) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeAll")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       /*for _, ep := range rtmgr.Eps {
+               go c.send(ep, policies)
+       }*/
+       //channel := make(chan EPStatus)
+
+       if rmrcallid == 200 {
+               rmrcallid = 1
        }
        }
+
+       for _, ep := range rtmgr.Eps {
+               go c.send_sync(ep, policies, rmrcallid)
+       }
+
+       rmrcallid++
+
+       /*
+                               count := 0
+                       result := make([]EPStatus, len(rtmgr.Eps))
+                       for i, _ := range result {
+                               result[i] = <-channel
+                               if result[i].status == true {
+                                       count++
+                               } else {
+                                       xapp.Logger.Error("RMR sent failed for endpoint %v", result[i].endpoint)
+                               }
+                       }
+
+                       if count < len(rtmgr.Eps) {
+                               return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
+                       }*/
+
        return nil
 }
 
        return nil
 }
 
-func (c *NngPush) DistributeAll(policies *[]string) error {
-       rtmgr.Logger.Debug("Invoked: sbi.DistributeAll")
-       rtmgr.Logger.Debug("args: %v", (*policies))
-       for _, ep := range rtmgr.Eps {
-               if ep.IsReady {
-                       go c.send(ep, policies)
-               } else {
-                       rtmgr.Logger.Warn("Endpoint " + ep.Uuid + " is not ready")
+//func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
+func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, call_id int) {
+       xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+
+       ret := c.send_data(ep, policies, call_id)
+       for count = 0; count <= 2 && ret == false; count++ {
+               time.Sleep(time.Second)
+               ret := c.send_data(ep, policies, call_id)
+               if ret == true {
+                       break
+               }
+               xapp.Logger.Error("Invoked  send_data to try again due to return value : %v", ret)
+       }
+
+       Conn.Lock()
+       rtmgr.RMRConnStatus[ep.Uuid] = ret
+       Conn.Unlock()
+       // Handling per connection .. may be updating global map
+
+       //channel <- EPStatus{ep.Uuid, ret}
+
+}
+
+func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
+       xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
+       var state int
+       var retstr string
+
+       var policy = []byte{}
+
+       for _, pe := range *policies {
+               b := []byte(pe)
+               for j := 0; j < len(b); j++ {
+                       policy = append(policy, b[j])
                }
        }
                }
        }
+       params := &RMRParams{&xapp.RMRParams{}}
+       params.Mtype = 20
+       params.PayloadLen = len(policy)
+       params.Payload = []byte(policy)
+       params.Mbuf = nil
+       params.Whid = ep.Whid
+       params.Callid = call_id
+       params.Timeout = 200
+       state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+       routestatus := strings.Split(retstr, " ")
+       if state != C.RMR_OK && routestatus[0] != "OK" {
+               xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+               return false
+       } else {
+               xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+               return true
+       }
+}
+
+func (c *RmrPush) CheckEndpoint(payload string) (ep *rtmgr.Endpoint) {
+       return c.checkEndpoint(payload)
+}
+
+func (c *RmrPush) CreateEndpoint(rmrsrc string) (ep *string, whid int) {
+       return c.createEndpoint(rmrsrc)
+}
+
+func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
+       xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
+       xapp.Logger.Debug("args: %v", *policies)
+
+       if rmrdynamiccallid == 255 {
+               rmrdynamiccallid = 201
+       }
+
+       go c.sendDynamicRoutes(ep, whid, policies, rmrdynamiccallid)
+       rmrdynamiccallid++
+
        return nil
 }
 
        return nil
 }
 
-func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
-       rtmgr.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
+func (c *RmrPush) sendDynamicRoutes(ep string, whid int, policies *[]string, call_id int) bool {
+       xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
+       var state int
+       var retstr string
+
+       var policy = []byte{}
+
        for _, pe := range *policies {
        for _, pe := range *policies {
-               if err := ep.Socket.(NngSocket).Send([]byte(pe)); err != nil {
-                       rtmgr.Logger.Error("Unable to send policy entry due to: " + err.Error())
+               b := []byte(pe)
+               for j := 0; j < len(b); j++ {
+                       policy = append(policy, b[j])
                }
        }
                }
        }
-       rtmgr.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len((*policies))) + ")")
+       params := &RMRParams{&xapp.RMRParams{}}
+       params.Mtype = 20
+       params.PayloadLen = len(policy)
+       params.Payload = []byte(policy)
+       params.Mbuf = nil
+       params.Whid = whid
+       params.Callid = call_id
+       params.Timeout = 200
+       state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
+       routestatus := strings.Split(retstr, " ")
+       if state != C.RMR_OK && routestatus[0] != "OK" {
+               xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
+               return false
+       } else {
+               xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+               return true
+       }
 }
 }