X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1-go%2Fpkg%2Frmr%2Frmr.go;h=c0366af02a79280e0876b7be93121b71f81ca80e;hb=9f4fe4d725c9ad77ea1c2ab565ce5759e8a932a6;hp=660089bcc97c275b5d1ba9f9924a106557079cda;hpb=159ddb7dd65305d364743d4728af35f84a6e1557;p=ric-plt%2Fa1.git diff --git a/a1-go/pkg/rmr/rmr.go b/a1-go/pkg/rmr/rmr.go index 660089b..c0366af 100644 --- a/a1-go/pkg/rmr/rmr.go +++ b/a1-go/pkg/rmr/rmr.go @@ -22,37 +22,81 @@ package rmr import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models" + "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" ) const ( - a1SourceName = "service-ricplt-a1mediator-http" + a1SourceName = "service-ricplt-a1mediator-http" + a1PolicyRequest = 20010 + ecsServiceHost = "http://ecs-service:8083" + ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes" + ecsEiJobPath = ecsServiceHost + "/A1-EI/v1/eijobs/" + a1EiQueryAllResp = 20014 + a1EiCreateJobResp = 20016 + jobCreationData = `{"ei_job_id": %s.}` ) type RmrSender struct { - rmrclient *xapp.RMRClient + rmrclient *xapp.RMRClient + policyManager *policy.PolicyManager } type IRmrSender interface { RmrSendToXapp(httpBodyString string, messagetype int) bool } -func NewRMRSender() IRmrSender { +func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender { RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{ StatDesc: "", RmrData: xapp.PortData{ + //TODO: Read configuration from config file Name: "", MaxSize: 65534, ThreadType: 0, LowLatency: false, FastAck: false, MaxRetryOnFailure: 1, + Port: 4561, }, }) - return &RmrSender{ - rmrclient: RMRclient, + + rmrsender := &RmrSender{ + rmrclient: RMRclient, + policyManager: policyManager, + } + + rmrsender.RmrRecieveStart() + return rmrsender +} + +var RICMessageTypes = map[string]int{ + "A1_POLICY_REQ": 20010, + "A1_POLICY_RESP": 20011, + "A1_POLICY_QUERY": 20012, + "A1_EI_QUERY_ALL": 20013, + "AI_EI_QUERY_ALL_RESP": 20014, + "A1_EI_CREATE_JOB": 20015, + "A1_EI_CREATE_JOB_RESP": 20016, + "A1_EI_DATA_DELIVERY": 20017, +} + +func (rmr *RmrSender) GetRicMessageName(id int) (s string) { + for k, v := range RICMessageTypes { + if id == v { + return k + } } + return } func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool { @@ -71,3 +115,155 @@ func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool a1.Logger.Debug("rmrSendToXapp: sending: %+v", s) return s } + +func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) { + a1.Logger.Debug("In the Consume function") + id := rmr.GetRicMessageName(msg.Mtype) + a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen) + + switch id { + + case "A1_POLICY_RESP": + a1.Logger.Debug("Recived policy responose") + payload := msg.Payload + a1.Logger.Debug("message recieved : %s", payload) + var result map[string]interface{} + err := json.Unmarshal([]byte(payload), &result) + if err != nil { + a1.Logger.Error("Unmarshal error : %+v", err) + return err + } + a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"]) + rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string)) + case "A1_POLICY_QUERY": + a1.Logger.Debug("Recived policy query") + a1.Logger.Debug("message recieved ", msg.Payload) + payload := msg.Payload + var result map[string]interface{} + json.Unmarshal([]byte(payload), &result) + a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"]) + policytypeid := (result["policy_type_id"].(float64)) + instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid)) + if err1 != nil { + a1.Logger.Error("Error : %+v", err1) + return err1 + } + a1.Logger.Debug("instanceList ", instanceList) + a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid) + for _, policyinstanceid := range instanceList { + policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid) + if err2 != nil { + a1.Logger.Error("Error : %+v", err2) + return err2 + } + a1.Logger.Debug("policyinstance ", policyinstance.(string)) + message := Message{} + rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE") + if err1 != nil { + a1.Logger.Error("error : %v", err1) + return err1 + } + a1.Logger.Debug("rmrMessage ", rmrMessage) + isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest) + if isSent { + a1.Logger.Debug("rmrSendToXapp : message sent") + } else { + a1.Logger.Error("rmrSendToXapp : message not sent") + } + } + + case "A1_EI_QUERY_ALL": + a1.Logger.Debug("message recieved ", msg.Payload) + resp, err := http.Get(ecsEiTypePath) + if err != nil { + a1.Logger.Error("Received error while fetching health info: %v", err) + } + if resp.StatusCode != http.StatusOK { + a1.Logger.Warning("Received no reponse from A1-EI service1") + } + a1.Logger.Debug("response from A1-EI service : ", resp) + + defer resp.Body.Close() + respByte, err := ioutil.ReadAll(resp.Body) + + if err != nil { + a1.Logger.Debug("error in response: %+v", respByte) + } + + a1.Logger.Debug("response : %+v", string(respByte)) + + isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp) + if isSent { + a1.Logger.Debug("rmrSendToXapp : message sent") + } else { + a1.Logger.Error("rmrSendToXapp : message not sent") + } + case "A1_EI_CREATE_JOB": + payload := msg.Payload + a1.Logger.Debug("message recieved : %s", payload) + + var result map[string]interface{} + + err := json.Unmarshal([]byte(payload), &result) + if err != nil { + a1.Logger.Error("Unmarshal error : %+v", err) + return err + } + a1.Logger.Debug("Unmarshaled message recieved : %s ", result) + + jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10) + jsonReq, err := json.Marshal(result) + if err != nil { + a1.Logger.Error("marshal error : %v", err) + return err + } + + a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr) + req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq)) + if err != nil { + a1.Logger.Error("http error : %v", err) + return err + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + client := &http.Client{} + resp, err3 := client.Do(req) + if err3 != nil { + a1.Logger.Error("error:", err3) + return err + } + + defer resp.Body.Close() + + a1.Logger.Debug("response status : ", resp.StatusCode) + if resp.StatusCode == 200 || resp.StatusCode == 201 { + a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr) + rmrData := fmt.Sprintf(jobCreationData, jobIdStr) + a1.Logger.Debug("rmr_Data to send: ", rmrData) + + isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp) + if isSent { + a1.Logger.Debug("rmrSendToXapp : message sent") + } else { + a1.Logger.Error("rmrSendToXapp : message not sent") + } + } else { + a1.Logger.Warning("failed to create EIJOB ") + } + + default: + xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype) + } + + defer func() { + rmr.rmrclient.Free(msg.Mbuf) + msg.Mbuf = nil + }() + return +} + +func (rmr *RmrSender) RmrRecieveStart() { + a1.Logger.Debug("Inside RmrRecieveStart function ") + go rmr.rmrclient.Start(rmr) + a1.Logger.Debug("Reciever started") +}