"net/http"
"strconv"
+ "gerrit.o-ran-sc.org/r/ric-plt/a1/config"
"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
a1EiQueryAllResp = 20014
a1EiCreateJobResp = 20016
jobCreationData = `{"ei_job_id": %s.}`
+ DefaultSubId = -1
)
type RmrSender struct {
}
type IRmrSender interface {
- RmrSendToXapp(httpBodyString string, messagetype int) bool
+ RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
}
func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
+ config := config.ParseConfiguration()
RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
StatDesc: "",
RmrData: xapp.PortData{
- //TODO: Read configuration from config file
- Name: "",
- MaxSize: 65534,
- ThreadType: 0,
- LowLatency: false,
- FastAck: false,
- MaxRetryOnFailure: 1,
- Port: 4561,
+
+ Name: config.Name,
+ MaxSize: config.MaxSize,
+ ThreadType: config.ThreadType,
+ LowLatency: config.LowLatency,
+ FastAck: config.FastAck,
+ MaxRetryOnFailure: config.MaxRetryOnFailure,
+ Port: config.Port,
},
})
return
}
-func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
+func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
params := &xapp.RMRParams{}
params.Mtype = messagetype
- params.SubId = -1
+ params.SubId = subid
params.Xid = ""
params.Meid = &xapp.RMRMeid{}
params.Src = a1SourceName
a1.Logger.Error("Unmarshal error : %+v", err)
return err
}
- a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
- rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
+ policyTypeId := int(result["policy_type_id"].(float64))
+ policyInstanceId := result["policy_instance_id"].(string)
+ policyHandlerId := result["handler_id"].(string)
+ policyStatus := result["status"].(string)
+
+ a1.Logger.Debug("message recieved for %d and %s with status : %s", policyTypeId, policyInstanceId, policyStatus)
+ rmr.policyManager.SetPolicyInstanceStatus(policyTypeId, policyInstanceId, policyStatus)
+ err = rmr.policyManager.SendPolicyStatusNotification(policyTypeId, policyInstanceId, policyHandlerId, policyStatus)
+ if err != nil {
+ a1.Logger.Debug("failed to send policy status notification %v+", err)
+ }
+
case "A1_POLICY_QUERY":
a1.Logger.Debug("Recived policy query")
a1.Logger.Debug("message recieved ", msg.Payload)
return err1
}
a1.Logger.Debug("rmrMessage ", rmrMessage)
- isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
+ isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
if isSent {
a1.Logger.Debug("rmrSendToXapp : message sent")
} else {
a1.Logger.Debug("response : %+v", string(respByte))
- isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
+ isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
if isSent {
a1.Logger.Debug("rmrSendToXapp : message sent")
} else {
rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
a1.Logger.Debug("rmr_Data to send: ", rmrData)
- isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp)
+ isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
if isSent {
a1.Logger.Debug("rmrSendToXapp : message sent")
} else {