X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1-go%2Fpkg%2Frmr%2Frmr.go;h=2bb2f1b2794c79612d9e07c7cbf245777cd62076;hb=521fb5516a4212246ff977a849dfd24f0c87b2a6;hp=78adf46499bf23c889782c115ed9d3382f19beb0;hpb=1a41ce75dbf508ae4d85c2dc4a562f36da04bb65;p=ric-plt%2Fa1.git diff --git a/a1-go/pkg/rmr/rmr.go b/a1-go/pkg/rmr/rmr.go index 78adf46..2bb2f1b 100644 --- a/a1-go/pkg/rmr/rmr.go +++ b/a1-go/pkg/rmr/rmr.go @@ -22,44 +22,82 @@ package rmr import ( + "encoding/json" + "io/ioutil" + "net/http" + "strconv" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" ) const ( - a1PolicyRequest = 20010 - a1SourceName = "service-ricplt-a1mediator-http" + a1SourceName = "service-ricplt-a1mediator-http" + a1PolicyRequest = 20010 + ecsServiceHost = "http://ecs-service:8083" + ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes" + a1EiQueryAllResp = 20014 ) type RmrSender struct { - rmrclient *xapp.RMRClient + rmrclient *xapp.RMRClient + policyManager *policy.PolicyManager } type IRmrSender interface { - RmrSendToXapp(httpBodyString string) bool + RmrSendToXapp(httpBodyString string, messagetype int) bool } -func NewRMRSender() IRmrSender { +func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender { RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{ StatDesc: "", RmrData: xapp.PortData{ + //TODO: Read configuration from config file Name: "", MaxSize: 65534, ThreadType: 0, LowLatency: false, FastAck: false, MaxRetryOnFailure: 1, + Port: 4561, }, }) - return &RmrSender{ - rmrclient: RMRclient, + + rmrsender := &RmrSender{ + rmrclient: RMRclient, + policyManager: policyManager, } + + rmrsender.RmrRecieveStart() + return rmrsender } -func (rmr *RmrSender) RmrSendToXapp(httpBodyString string) bool { +var RICMessageTypes = map[string]int{ + "A1_POLICY_REQ": 20010, + "A1_POLICY_RESP": 20011, + "A1_POLICY_QUERY": 20012, + "A1_EI_QUERY_ALL": 20013, + "AI_EI_QUERY_ALL_RESP": 20014, + "A1_EI_CREATE_JOB": 20015, + "A1_EI_CREATE_JOB_RESP": 20016, + "A1_EI_DATA_DELIVERY": 20017, +} + +func (rmr *RmrSender) GetRicMessageName(id int) (s string) { + for k, v := range RICMessageTypes { + if id == v { + return k + } + } + return +} + +func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool { params := &xapp.RMRParams{} - params.Mtype = a1PolicyRequest + params.Mtype = messagetype params.SubId = -1 params.Xid = "" params.Meid = &xapp.RMRMeid{} @@ -72,3 +110,103 @@ func (rmr *RmrSender) RmrSendToXapp(httpBodyString string) bool { a1.Logger.Debug("rmrSendToXapp: sending: %+v", s) return s } + +func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) { + a1.Logger.Debug("In the Consume function") + id := rmr.GetRicMessageName(msg.Mtype) + a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen) + + switch id { + + case "A1_POLICY_RESP": + a1.Logger.Debug("Recived policy responose") + payload := msg.Payload + a1.Logger.Debug("message recieved : %s", payload) + var result map[string]interface{} + err := json.Unmarshal([]byte(payload), &result) + if err != nil { + a1.Logger.Error("Unmarshal error : %+v", err) + return err + } + a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"]) + rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string)) + case "A1_POLICY_QUERY": + a1.Logger.Debug("Recived policy query") + a1.Logger.Debug("message recieved ", msg.Payload) + payload := msg.Payload + var result map[string]interface{} + json.Unmarshal([]byte(payload), &result) + a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"]) + policytypeid := (result["policy_type_id"].(float64)) + instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid)) + if err1 != nil { + a1.Logger.Error("Error : %+v", err1) + return err1 + } + a1.Logger.Debug("instanceList ", instanceList) + a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid) + for _, policyinstanceid := range instanceList { + policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid) + if err2 != nil { + a1.Logger.Error("Error : %+v", err2) + return err2 + } + a1.Logger.Debug("policyinstance ", policyinstance.(string)) + message := Message{} + rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE") + if err1 != nil { + a1.Logger.Error("error : %v", err1) + return err1 + } + a1.Logger.Debug("rmrMessage ", rmrMessage) + isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest) + if isSent { + a1.Logger.Debug("rmrSendToXapp : message sent") + } else { + a1.Logger.Error("rmrSendToXapp : message not sent") + } + } + + case "A1_EI_QUERY_ALL": + a1.Logger.Debug("message recieved ", msg.Payload) + resp, err := http.Get(ecsEiTypePath) + if err != nil { + a1.Logger.Error("Received error while fetching health info: %v", err) + } + if resp.StatusCode != http.StatusOK { + a1.Logger.Warning("Received no reponse from A1-EI service1") + } + a1.Logger.Debug("response from A1-EI service : ", resp) + + defer resp.Body.Close() + respByte, err := ioutil.ReadAll(resp.Body) + + if err != nil { + a1.Logger.Debug("error in response: %+v", respByte) + } + + a1.Logger.Debug("response : %+v", string(respByte)) + + isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp) + if isSent { + a1.Logger.Debug("rmrSendToXapp : message sent") + } else { + a1.Logger.Error("rmrSendToXapp : message not sent") + } + + default: + xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype) + } + + defer func() { + rmr.rmrclient.Free(msg.Mbuf) + msg.Mbuf = nil + }() + return +} + +func (rmr *RmrSender) RmrRecieveStart() { + a1.Logger.Debug("Inside RmrRecieveStart function ") + rmr.rmrclient.Start(rmr) + a1.Logger.Debug("Reciever started") +}