package rmr
import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+
"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+ "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
+ "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
)
const (
- a1PolicyRequest = 20010
- a1SourceName = "service-ricplt-a1mediator-http"
+ a1SourceName = "service-ricplt-a1mediator-http"
+ a1PolicyRequest = 20010
+ ecsServiceHost = "http://ecs-service:8083"
+ ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes"
+ a1EiQueryAllResp = 20014
)
type RmrSender struct {
- rmrclient *xapp.RMRClient
+ rmrclient *xapp.RMRClient
+ policyManager *policy.PolicyManager
}
type IRmrSender interface {
- RmrSendToXapp(httpBodyString string) bool
+ RmrSendToXapp(httpBodyString string, messagetype int) bool
}
-func NewRMRSender() IRmrSender {
+func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
StatDesc: "",
RmrData: xapp.PortData{
+ //TODO: Read configuration from config file
Name: "",
MaxSize: 65534,
ThreadType: 0,
LowLatency: false,
FastAck: false,
MaxRetryOnFailure: 1,
+ Port: 4561,
},
})
- return &RmrSender{
- rmrclient: RMRclient,
+
+ rmrsender := &RmrSender{
+ rmrclient: RMRclient,
+ policyManager: policyManager,
}
+
+ rmrsender.RmrRecieveStart()
+ return rmrsender
}
-func (rmr *RmrSender) RmrSendToXapp(httpBodyString string) bool {
+var RICMessageTypes = map[string]int{
+ "A1_POLICY_REQ": 20010,
+ "A1_POLICY_RESP": 20011,
+ "A1_POLICY_QUERY": 20012,
+ "A1_EI_QUERY_ALL": 20013,
+ "AI_EI_QUERY_ALL_RESP": 20014,
+ "A1_EI_CREATE_JOB": 20015,
+ "A1_EI_CREATE_JOB_RESP": 20016,
+ "A1_EI_DATA_DELIVERY": 20017,
+}
+
+func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
+ for k, v := range RICMessageTypes {
+ if id == v {
+ return k
+ }
+ }
+ return
+}
+
+func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
params := &xapp.RMRParams{}
- params.Mtype = a1PolicyRequest
+ params.Mtype = messagetype
params.SubId = -1
params.Xid = ""
params.Meid = &xapp.RMRMeid{}
a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
return s
}
+
+func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
+ a1.Logger.Debug("In the Consume function")
+ id := rmr.GetRicMessageName(msg.Mtype)
+ a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
+
+ switch id {
+
+ case "A1_POLICY_RESP":
+ a1.Logger.Debug("Recived policy responose")
+ payload := msg.Payload
+ a1.Logger.Debug("message recieved : %s", payload)
+ var result map[string]interface{}
+ err := json.Unmarshal([]byte(payload), &result)
+ if err != nil {
+ a1.Logger.Error("Unmarshal error : %+v", err)
+ return err
+ }
+ a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
+ rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
+ case "A1_POLICY_QUERY":
+ a1.Logger.Debug("Recived policy query")
+ a1.Logger.Debug("message recieved ", msg.Payload)
+ payload := msg.Payload
+ var result map[string]interface{}
+ json.Unmarshal([]byte(payload), &result)
+ a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
+ policytypeid := (result["policy_type_id"].(float64))
+ instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
+ if err1 != nil {
+ a1.Logger.Error("Error : %+v", err1)
+ return err1
+ }
+ a1.Logger.Debug("instanceList ", instanceList)
+ a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
+ for _, policyinstanceid := range instanceList {
+ policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
+ if err2 != nil {
+ a1.Logger.Error("Error : %+v", err2)
+ return err2
+ }
+ a1.Logger.Debug("policyinstance ", policyinstance.(string))
+ message := Message{}
+ rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
+ if err1 != nil {
+ a1.Logger.Error("error : %v", err1)
+ return err1
+ }
+ a1.Logger.Debug("rmrMessage ", rmrMessage)
+ isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
+ if isSent {
+ a1.Logger.Debug("rmrSendToXapp : message sent")
+ } else {
+ a1.Logger.Error("rmrSendToXapp : message not sent")
+ }
+ }
+
+ case "A1_EI_QUERY_ALL":
+ a1.Logger.Debug("message recieved ", msg.Payload)
+ resp, err := http.Get(ecsEiTypePath)
+ if err != nil {
+ a1.Logger.Error("Received error while fetching health info: %v", err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ a1.Logger.Warning("Received no reponse from A1-EI service1")
+ }
+ a1.Logger.Debug("response from A1-EI service : ", resp)
+
+ defer resp.Body.Close()
+ respByte, err := ioutil.ReadAll(resp.Body)
+
+ if err != nil {
+ a1.Logger.Debug("error in response: %+v", respByte)
+ }
+
+ a1.Logger.Debug("response : %+v", string(respByte))
+
+ isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
+ if isSent {
+ a1.Logger.Debug("rmrSendToXapp : message sent")
+ } else {
+ a1.Logger.Error("rmrSendToXapp : message not sent")
+ }
+
+ default:
+ xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
+ }
+
+ defer func() {
+ rmr.rmrclient.Free(msg.Mbuf)
+ msg.Mbuf = nil
+ }()
+ return
+}
+
+func (rmr *RmrSender) RmrRecieveStart() {
+ a1.Logger.Debug("Inside RmrRecieveStart function ")
+ rmr.rmrclient.Start(rmr)
+ a1.Logger.Debug("Reciever started")
+}