2 ==================================================================================
3 Copyright (c) 2022 Samsung
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 platform project (RICP).
19 ==================================================================================
32 "gerrit.o-ran-sc.org/r/ric-plt/a1/config"
33 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
34 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
35 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
36 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
40 a1SourceName = "service-ricplt-a1mediator-http"
41 a1PolicyRequest = 20010
42 ecsServiceHost = "http://ecs-service:8083"
43 ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes"
44 ecsEiJobPath = ecsServiceHost + "/A1-EI/v1/eijobs/"
45 a1EiQueryAllResp = 20014
46 a1EiCreateJobResp = 20016
47 jobCreationData = `{"ei_job_id": %s.}`
51 type RmrSender struct {
52 rmrclient *xapp.RMRClient
53 policyManager *policy.PolicyManager
56 type IRmrSender interface {
57 RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
60 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
61 config := config.ParseConfiguration()
62 RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
64 RmrData: xapp.PortData{
67 MaxSize: config.MaxSize,
68 ThreadType: config.ThreadType,
69 LowLatency: config.LowLatency,
70 FastAck: config.FastAck,
71 MaxRetryOnFailure: config.MaxRetryOnFailure,
76 rmrsender := &RmrSender{
78 policyManager: policyManager,
81 rmrsender.RmrRecieveStart()
85 var RICMessageTypes = map[string]int{
86 "A1_POLICY_REQ": 20010,
87 "A1_POLICY_RESP": 20011,
88 "A1_POLICY_QUERY": 20012,
89 "A1_EI_QUERY_ALL": 20013,
90 "AI_EI_QUERY_ALL_RESP": 20014,
91 "A1_EI_CREATE_JOB": 20015,
92 "A1_EI_CREATE_JOB_RESP": 20016,
93 "A1_EI_DATA_DELIVERY": 20017,
96 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
97 for k, v := range RICMessageTypes {
105 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
107 params := &xapp.RMRParams{}
108 params.Mtype = messagetype
111 params.Meid = &xapp.RMRMeid{}
112 params.Src = a1SourceName
113 params.PayloadLen = len([]byte(httpBodyString))
114 params.Payload = []byte(httpBodyString)
115 a1.Logger.Debug("MSG to XAPP: %s ", params.String())
116 a1.Logger.Debug("len payload %+v", len(params.Payload))
117 s := rmr.rmrclient.SendMsg(params)
118 a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
122 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
123 a1.Logger.Debug("In the Consume function")
124 id := rmr.GetRicMessageName(msg.Mtype)
125 a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
129 case "A1_POLICY_RESP":
130 a1.Logger.Debug("Recived policy responose")
131 payload := msg.Payload
132 a1.Logger.Debug("message recieved : %s", payload)
133 var result map[string]interface{}
134 err := json.Unmarshal([]byte(payload), &result)
136 a1.Logger.Error("Unmarshal error : %+v", err)
139 policyTypeId := int(result["policy_type_id"].(float64))
140 policyInstanceId := result["policy_instance_id"].(string)
141 policyHandlerId := result["handler_id"].(string)
142 policyStatus := result["status"].(string)
144 a1.Logger.Debug("message recieved for %d and %s with status : %s", policyTypeId, policyInstanceId, policyStatus)
145 rmr.policyManager.SetPolicyInstanceStatus(policyTypeId, policyInstanceId, policyStatus)
146 err = rmr.policyManager.SendPolicyStatusNotification(policyTypeId, policyInstanceId, policyHandlerId, policyStatus)
148 a1.Logger.Debug("failed to send policy status notification %v+", err)
151 case "A1_POLICY_QUERY":
152 a1.Logger.Debug("Recived policy query")
153 a1.Logger.Debug("message recieved ", msg.Payload)
154 payload := msg.Payload
155 var result map[string]interface{}
156 json.Unmarshal([]byte(payload), &result)
157 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
158 policytypeid := (result["policy_type_id"].(float64))
159 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
161 a1.Logger.Error("Error : %+v", err1)
164 a1.Logger.Debug("instanceList ", instanceList)
165 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
166 for _, policyinstanceid := range instanceList {
167 policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
169 a1.Logger.Error("Error : %+v", err2)
172 a1.Logger.Debug("policyinstance ", policyinstance.(string))
174 rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
176 a1.Logger.Error("error : %v", err1)
179 a1.Logger.Debug("rmrMessage ", rmrMessage)
180 isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
182 a1.Logger.Debug("rmrSendToXapp : message sent")
184 a1.Logger.Error("rmrSendToXapp : message not sent")
188 case "A1_EI_QUERY_ALL":
189 a1.Logger.Debug("message recieved ", msg.Payload)
190 resp, err := http.Get(ecsEiTypePath)
192 a1.Logger.Error("Received error while fetching health info: %v", err)
194 if resp.StatusCode != http.StatusOK {
195 a1.Logger.Warning("Received no reponse from A1-EI service1")
197 a1.Logger.Debug("response from A1-EI service : ", resp)
199 defer resp.Body.Close()
200 respByte, err := ioutil.ReadAll(resp.Body)
203 a1.Logger.Debug("error in response: %+v", respByte)
206 a1.Logger.Debug("response : %+v", string(respByte))
208 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
210 a1.Logger.Debug("rmrSendToXapp : message sent")
212 a1.Logger.Error("rmrSendToXapp : message not sent")
214 case "A1_EI_CREATE_JOB":
215 payload := msg.Payload
216 a1.Logger.Debug("message recieved : %s", payload)
218 var result map[string]interface{}
220 err := json.Unmarshal([]byte(payload), &result)
222 a1.Logger.Error("Unmarshal error : %+v", err)
225 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
227 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
228 jsonReq, err := json.Marshal(result)
230 a1.Logger.Error("marshal error : %v", err)
234 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
235 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
237 a1.Logger.Error("http error : %v", err)
241 req.Header.Set("Content-Type", "application/json; charset=utf-8")
242 client := &http.Client{}
243 resp, err3 := client.Do(req)
245 a1.Logger.Error("error:", err3)
249 defer resp.Body.Close()
251 a1.Logger.Debug("response status : ", resp.StatusCode)
252 if resp.StatusCode == 200 || resp.StatusCode == 201 {
253 a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
254 rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
255 a1.Logger.Debug("rmr_Data to send: ", rmrData)
257 isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
259 a1.Logger.Debug("rmrSendToXapp : message sent")
261 a1.Logger.Error("rmrSendToXapp : message not sent")
264 a1.Logger.Warning("failed to create EIJOB ")
268 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
272 rmr.rmrclient.Free(msg.Mbuf)
278 func (rmr *RmrSender) RmrRecieveStart() {
279 a1.Logger.Debug("Inside RmrRecieveStart function ")
280 go rmr.rmrclient.Start(rmr)
281 a1.Logger.Debug("Reciever started")