RMR handler for A1 policy response
[ric-plt/a1.git] / a1-go / pkg / rmr / rmr.go
index 660089b..2e158f8 100644 (file)
 package rmr
 
 import (
+       "encoding/json"
+
        "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 )
 
@@ -31,28 +34,36 @@ const (
 )
 
 type RmrSender struct {
-       rmrclient *xapp.RMRClient
+       rmrclient     *xapp.RMRClient
+       policyManager *policy.PolicyManager
 }
 
 type IRmrSender interface {
        RmrSendToXapp(httpBodyString string, messagetype int) bool
 }
 
-func NewRMRSender() IRmrSender {
+func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
        RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
                StatDesc: "",
                RmrData: xapp.PortData{
+                       //TODO: Read configuration from config file
                        Name:              "",
                        MaxSize:           65534,
                        ThreadType:        0,
                        LowLatency:        false,
                        FastAck:           false,
                        MaxRetryOnFailure: 1,
+                       Port:              4561,
                },
        })
-       return &RmrSender{
-               rmrclient: RMRclient,
+
+       rmrsender := &RmrSender{
+               rmrclient:     RMRclient,
+               policyManager: policyManager,
        }
+
+       rmrsender.RmrRecieveStart()
+       return rmrsender
 }
 
 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
@@ -71,3 +82,39 @@ func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool
        a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
        return s
 }
+
+func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
+       a1.Logger.Debug("In the Consume function")
+       id := xapp.Rmr.GetRicMessageName(msg.Mtype)
+       a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
+
+       switch id {
+
+       case "A1_POLICY_RESP":
+               a1.Logger.Debug("Recived policy responose")
+               payload := msg.Payload
+               a1.Logger.Debug("message recieved : %s", payload)
+               var result map[string]interface{}
+               err := json.Unmarshal([]byte(payload), &result)
+               if err != nil {
+                       a1.Logger.Error("Unmarshal error : %+v", err)
+                       return err
+               }
+               a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
+               rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
+       default:
+               xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
+       }
+
+       defer func() {
+               rmr.rmrclient.Free(msg.Mbuf)
+               msg.Mbuf = nil
+       }()
+       return
+}
+
+func (rmr *RmrSender) RmrRecieveStart() {
+       a1.Logger.Debug("Inside RmrRecieveStart function ")
+       rmr.rmrclient.Start(rmr)
+       a1.Logger.Debug("Reciever started")
+}