X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=a1-go%2Fpkg%2Frmr%2Frmr.go;h=2e158f834ef199d26f5e7dc530fce564a0d9139c;hb=6d6fe01775c2dcf4049287abc1525b746bb8ec29;hp=660089bcc97c275b5d1ba9f9924a106557079cda;hpb=a5d992815f470a349dea86d616509222eb051f97;p=ric-plt%2Fa1.git diff --git a/a1-go/pkg/rmr/rmr.go b/a1-go/pkg/rmr/rmr.go index 660089b..2e158f8 100644 --- a/a1-go/pkg/rmr/rmr.go +++ b/a1-go/pkg/rmr/rmr.go @@ -22,7 +22,10 @@ package rmr import ( + "encoding/json" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" ) @@ -31,28 +34,36 @@ const ( ) type RmrSender struct { - rmrclient *xapp.RMRClient + rmrclient *xapp.RMRClient + policyManager *policy.PolicyManager } type IRmrSender interface { RmrSendToXapp(httpBodyString string, messagetype int) bool } -func NewRMRSender() IRmrSender { +func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender { RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{ StatDesc: "", RmrData: xapp.PortData{ + //TODO: Read configuration from config file Name: "", MaxSize: 65534, ThreadType: 0, LowLatency: false, FastAck: false, MaxRetryOnFailure: 1, + Port: 4561, }, }) - return &RmrSender{ - rmrclient: RMRclient, + + rmrsender := &RmrSender{ + rmrclient: RMRclient, + policyManager: policyManager, } + + rmrsender.RmrRecieveStart() + return rmrsender } func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool { @@ -71,3 +82,39 @@ func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool a1.Logger.Debug("rmrSendToXapp: sending: %+v", s) return s } + +func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) { + a1.Logger.Debug("In the Consume function") + id := xapp.Rmr.GetRicMessageName(msg.Mtype) + a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen) + + switch id { + + case "A1_POLICY_RESP": + a1.Logger.Debug("Recived policy responose") + payload := msg.Payload + a1.Logger.Debug("message recieved : %s", payload) + var result map[string]interface{} + err := json.Unmarshal([]byte(payload), &result) + if err != nil { + a1.Logger.Error("Unmarshal error : %+v", err) + return err + } + a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"]) + rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string)) + default: + xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype) + } + + defer func() { + rmr.rmrclient.Free(msg.Mbuf) + msg.Mbuf = nil + }() + return +} + +func (rmr *RmrSender) RmrRecieveStart() { + a1.Logger.Debug("Inside RmrRecieveStart function ") + rmr.rmrclient.Start(rmr) + a1.Logger.Debug("Reciever started") +}