2 ==================================================================================
3 Copyright (c) 2022 Samsung
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 platform project (RICP).
19 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
31 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
32 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
37 a1SourceName = "service-ricplt-a1mediator-http"
38 a1PolicyRequest = 20010
39 ecsServiceHost = "http://ecs-service:8083"
40 ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes"
41 a1EiQueryAllResp = 20014
44 type RmrSender struct {
45 rmrclient *xapp.RMRClient
46 policyManager *policy.PolicyManager
49 type IRmrSender interface {
50 RmrSendToXapp(httpBodyString string, messagetype int) bool
53 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
54 RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
56 RmrData: xapp.PortData{
57 //TODO: Read configuration from config file
68 rmrsender := &RmrSender{
70 policyManager: policyManager,
73 rmrsender.RmrRecieveStart()
77 var RICMessageTypes = map[string]int{
78 "A1_POLICY_REQ": 20010,
79 "A1_POLICY_RESP": 20011,
80 "A1_POLICY_QUERY": 20012,
81 "A1_EI_QUERY_ALL": 20013,
82 "AI_EI_QUERY_ALL_RESP": 20014,
83 "A1_EI_CREATE_JOB": 20015,
84 "A1_EI_CREATE_JOB_RESP": 20016,
85 "A1_EI_DATA_DELIVERY": 20017,
88 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
89 for k, v := range RICMessageTypes {
97 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
99 params := &xapp.RMRParams{}
100 params.Mtype = messagetype
103 params.Meid = &xapp.RMRMeid{}
104 params.Src = a1SourceName
105 params.PayloadLen = len([]byte(httpBodyString))
106 params.Payload = []byte(httpBodyString)
107 a1.Logger.Debug("MSG to XAPP: %s ", params.String())
108 a1.Logger.Debug("len payload %+v", len(params.Payload))
109 s := rmr.rmrclient.SendMsg(params)
110 a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
114 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
115 a1.Logger.Debug("In the Consume function")
116 id := rmr.GetRicMessageName(msg.Mtype)
117 a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
121 case "A1_POLICY_RESP":
122 a1.Logger.Debug("Recived policy responose")
123 payload := msg.Payload
124 a1.Logger.Debug("message recieved : %s", payload)
125 var result map[string]interface{}
126 err := json.Unmarshal([]byte(payload), &result)
128 a1.Logger.Error("Unmarshal error : %+v", err)
131 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
132 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
133 case "A1_POLICY_QUERY":
134 a1.Logger.Debug("Recived policy query")
135 a1.Logger.Debug("message recieved ", msg.Payload)
136 payload := msg.Payload
137 var result map[string]interface{}
138 json.Unmarshal([]byte(payload), &result)
139 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
140 policytypeid := (result["policy_type_id"].(float64))
141 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
143 a1.Logger.Error("Error : %+v", err1)
146 a1.Logger.Debug("instanceList ", instanceList)
147 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
148 for _, policyinstanceid := range instanceList {
149 policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
151 a1.Logger.Error("Error : %+v", err2)
154 a1.Logger.Debug("policyinstance ", policyinstance.(string))
156 rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
158 a1.Logger.Error("error : %v", err1)
161 a1.Logger.Debug("rmrMessage ", rmrMessage)
162 isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
164 a1.Logger.Debug("rmrSendToXapp : message sent")
166 a1.Logger.Error("rmrSendToXapp : message not sent")
170 case "A1_EI_QUERY_ALL":
171 a1.Logger.Debug("message recieved ", msg.Payload)
172 resp, err := http.Get(ecsEiTypePath)
174 a1.Logger.Error("Received error while fetching health info: %v", err)
176 if resp.StatusCode != http.StatusOK {
177 a1.Logger.Warning("Received no reponse from A1-EI service1")
179 a1.Logger.Debug("response from A1-EI service : ", resp)
181 defer resp.Body.Close()
182 respByte, err := ioutil.ReadAll(resp.Body)
185 a1.Logger.Debug("error in response: %+v", respByte)
188 a1.Logger.Debug("response : %+v", string(respByte))
190 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
192 a1.Logger.Debug("rmrSendToXapp : message sent")
194 a1.Logger.Error("rmrSendToXapp : message not sent")
198 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
202 rmr.rmrclient.Free(msg.Mbuf)
208 func (rmr *RmrSender) RmrRecieveStart() {
209 a1.Logger.Debug("Inside RmrRecieveStart function ")
210 rmr.rmrclient.Start(rmr)
211 a1.Logger.Debug("Reciever started")