Replacing a1-python with a1-go implementation
[ric-plt/a1.git] / pkg / rmr / rmr.go
diff --git a/pkg/rmr/rmr.go b/pkg/rmr/rmr.go
new file mode 100644 (file)
index 0000000..c0366af
--- /dev/null
@@ -0,0 +1,269 @@
+/*
+==================================================================================
+  Copyright (c) 2022 Samsung
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+==================================================================================
+*/
+
+package rmr
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "strconv"
+
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
+       "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
+const (
+       a1SourceName      = "service-ricplt-a1mediator-http"
+       a1PolicyRequest   = 20010
+       ecsServiceHost    = "http://ecs-service:8083"
+       ecsEiTypePath     = ecsServiceHost + "/A1-EI/v1/eitypes"
+       ecsEiJobPath      = ecsServiceHost + "/A1-EI/v1/eijobs/"
+       a1EiQueryAllResp  = 20014
+       a1EiCreateJobResp = 20016
+       jobCreationData   = `{"ei_job_id": %s.}`
+)
+
+type RmrSender struct {
+       rmrclient     *xapp.RMRClient
+       policyManager *policy.PolicyManager
+}
+
+type IRmrSender interface {
+       RmrSendToXapp(httpBodyString string, messagetype int) bool
+}
+
+func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
+       RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
+               StatDesc: "",
+               RmrData: xapp.PortData{
+                       //TODO: Read configuration from config file
+                       Name:              "",
+                       MaxSize:           65534,
+                       ThreadType:        0,
+                       LowLatency:        false,
+                       FastAck:           false,
+                       MaxRetryOnFailure: 1,
+                       Port:              4561,
+               },
+       })
+
+       rmrsender := &RmrSender{
+               rmrclient:     RMRclient,
+               policyManager: policyManager,
+       }
+
+       rmrsender.RmrRecieveStart()
+       return rmrsender
+}
+
+var RICMessageTypes = map[string]int{
+       "A1_POLICY_REQ":         20010,
+       "A1_POLICY_RESP":        20011,
+       "A1_POLICY_QUERY":       20012,
+       "A1_EI_QUERY_ALL":       20013,
+       "AI_EI_QUERY_ALL_RESP":  20014,
+       "A1_EI_CREATE_JOB":      20015,
+       "A1_EI_CREATE_JOB_RESP": 20016,
+       "A1_EI_DATA_DELIVERY":   20017,
+}
+
+func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
+       for k, v := range RICMessageTypes {
+               if id == v {
+                       return k
+               }
+       }
+       return
+}
+
+func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
+
+       params := &xapp.RMRParams{}
+       params.Mtype = messagetype
+       params.SubId = -1
+       params.Xid = ""
+       params.Meid = &xapp.RMRMeid{}
+       params.Src = a1SourceName
+       params.PayloadLen = len([]byte(httpBodyString))
+       params.Payload = []byte(httpBodyString)
+       a1.Logger.Debug("MSG to XAPP: %s ", params.String())
+       a1.Logger.Debug("len payload %+v", len(params.Payload))
+       s := rmr.rmrclient.SendMsg(params)
+       a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
+       return s
+}
+
+func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
+       a1.Logger.Debug("In the Consume function")
+       id := rmr.GetRicMessageName(msg.Mtype)
+       a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
+
+       switch id {
+
+       case "A1_POLICY_RESP":
+               a1.Logger.Debug("Recived policy responose")
+               payload := msg.Payload
+               a1.Logger.Debug("message recieved : %s", payload)
+               var result map[string]interface{}
+               err := json.Unmarshal([]byte(payload), &result)
+               if err != nil {
+                       a1.Logger.Error("Unmarshal error : %+v", err)
+                       return err
+               }
+               a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
+               rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
+       case "A1_POLICY_QUERY":
+               a1.Logger.Debug("Recived policy query")
+               a1.Logger.Debug("message recieved ", msg.Payload)
+               payload := msg.Payload
+               var result map[string]interface{}
+               json.Unmarshal([]byte(payload), &result)
+               a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
+               policytypeid := (result["policy_type_id"].(float64))
+               instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
+               if err1 != nil {
+                       a1.Logger.Error("Error : %+v", err1)
+                       return err1
+               }
+               a1.Logger.Debug("instanceList ", instanceList)
+               a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
+               for _, policyinstanceid := range instanceList {
+                       policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
+                       if err2 != nil {
+                               a1.Logger.Error("Error : %+v", err2)
+                               return err2
+                       }
+                       a1.Logger.Debug("policyinstance ", policyinstance.(string))
+                       message := Message{}
+                       rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
+                       if err1 != nil {
+                               a1.Logger.Error("error : %v", err1)
+                               return err1
+                       }
+                       a1.Logger.Debug("rmrMessage ", rmrMessage)
+                       isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
+                       if isSent {
+                               a1.Logger.Debug("rmrSendToXapp : message sent")
+                       } else {
+                               a1.Logger.Error("rmrSendToXapp : message not sent")
+                       }
+               }
+
+       case "A1_EI_QUERY_ALL":
+               a1.Logger.Debug("message recieved ", msg.Payload)
+               resp, err := http.Get(ecsEiTypePath)
+               if err != nil {
+                       a1.Logger.Error("Received error while fetching health info: %v", err)
+               }
+               if resp.StatusCode != http.StatusOK {
+                       a1.Logger.Warning("Received no reponse from A1-EI service1")
+               }
+               a1.Logger.Debug("response from A1-EI service : ", resp)
+
+               defer resp.Body.Close()
+               respByte, err := ioutil.ReadAll(resp.Body)
+
+               if err != nil {
+                       a1.Logger.Debug("error in response: %+v", respByte)
+               }
+
+               a1.Logger.Debug("response : %+v", string(respByte))
+
+               isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
+               if isSent {
+                       a1.Logger.Debug("rmrSendToXapp : message sent")
+               } else {
+                       a1.Logger.Error("rmrSendToXapp : message not sent")
+               }
+       case "A1_EI_CREATE_JOB":
+               payload := msg.Payload
+               a1.Logger.Debug("message recieved : %s", payload)
+
+               var result map[string]interface{}
+
+               err := json.Unmarshal([]byte(payload), &result)
+               if err != nil {
+                       a1.Logger.Error("Unmarshal error : %+v", err)
+                       return err
+               }
+               a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
+
+               jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
+               jsonReq, err := json.Marshal(result)
+               if err != nil {
+                       a1.Logger.Error("marshal error : %v", err)
+                       return err
+               }
+
+               a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
+               req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
+               if err != nil {
+                       a1.Logger.Error("http error : %v", err)
+                       return err
+               }
+
+               req.Header.Set("Content-Type", "application/json; charset=utf-8")
+               client := &http.Client{}
+               resp, err3 := client.Do(req)
+               if err3 != nil {
+                       a1.Logger.Error("error:", err3)
+                       return err
+               }
+
+               defer resp.Body.Close()
+
+               a1.Logger.Debug("response status : ", resp.StatusCode)
+               if resp.StatusCode == 200 || resp.StatusCode == 201 {
+                       a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
+                       rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
+                       a1.Logger.Debug("rmr_Data to send: ", rmrData)
+
+                       isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp)
+                       if isSent {
+                               a1.Logger.Debug("rmrSendToXapp : message sent")
+                       } else {
+                               a1.Logger.Error("rmrSendToXapp : message not sent")
+                       }
+               } else {
+                       a1.Logger.Warning("failed to create EIJOB ")
+               }
+
+       default:
+               xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
+       }
+
+       defer func() {
+               rmr.rmrclient.Free(msg.Mbuf)
+               msg.Mbuf = nil
+       }()
+       return
+}
+
+func (rmr *RmrSender) RmrRecieveStart() {
+       a1.Logger.Debug("Inside RmrRecieveStart function ")
+       go rmr.rmrclient.Start(rmr)
+       a1.Logger.Debug("Reciever started")
+}