2 ==================================================================================
3 Copyright (c) 2022 Samsung
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 platform project (RICP).
19 ==================================================================================
32 "gerrit.o-ran-sc.org/r/ric-plt/a1/config"
33 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
34 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
35 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
36 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
40 a1SourceName = "service-ricplt-a1mediator-http"
41 a1PolicyRequest = 20010
42 ecsServiceHost = "http://ecs-service:8083"
43 ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes"
44 ecsEiJobPath = ecsServiceHost + "/A1-EI/v1/eijobs/"
45 a1EiQueryAllResp = 20014
46 a1EiCreateJobResp = 20016
47 jobCreationData = `{"ei_job_id": %s.}`
51 type RmrSender struct {
52 rmrclient *xapp.RMRClient
53 policyManager *policy.PolicyManager
56 type IRmrSender interface {
57 RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
60 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
61 config := config.ParseConfiguration()
62 RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
64 RmrData: xapp.PortData{
67 MaxSize: config.MaxSize,
68 ThreadType: config.ThreadType,
69 LowLatency: config.LowLatency,
70 FastAck: config.FastAck,
71 MaxRetryOnFailure: config.MaxRetryOnFailure,
76 rmrsender := &RmrSender{
78 policyManager: policyManager,
81 rmrsender.RmrRecieveStart()
85 var RICMessageTypes = map[string]int{
86 "A1_POLICY_REQ": 20010,
87 "A1_POLICY_RESP": 20011,
88 "A1_POLICY_QUERY": 20012,
89 "A1_EI_QUERY_ALL": 20013,
90 "AI_EI_QUERY_ALL_RESP": 20014,
91 "A1_EI_CREATE_JOB": 20015,
92 "A1_EI_CREATE_JOB_RESP": 20016,
93 "A1_EI_DATA_DELIVERY": 20017,
96 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
97 for k, v := range RICMessageTypes {
105 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
107 params := &xapp.RMRParams{}
108 params.Mtype = messagetype
111 params.Meid = &xapp.RMRMeid{}
112 params.Src = a1SourceName
113 params.PayloadLen = len([]byte(httpBodyString))
114 params.Payload = []byte(httpBodyString)
115 a1.Logger.Debug("MSG to XAPP: %s ", params.String())
116 a1.Logger.Debug("len payload %+v", len(params.Payload))
117 s := rmr.rmrclient.SendMsg(params)
118 a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
122 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
123 a1.Logger.Debug("In the Consume function")
124 id := rmr.GetRicMessageName(msg.Mtype)
125 a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
129 case "A1_POLICY_RESP":
130 a1.Logger.Debug("Recived policy responose")
131 payload := msg.Payload
132 a1.Logger.Debug("message recieved : %s", payload)
133 var result map[string]interface{}
134 err := json.Unmarshal([]byte(payload), &result)
136 a1.Logger.Error("Unmarshal error : %+v", err)
139 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
140 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
141 case "A1_POLICY_QUERY":
142 a1.Logger.Debug("Recived policy query")
143 a1.Logger.Debug("message recieved ", msg.Payload)
144 payload := msg.Payload
145 var result map[string]interface{}
146 json.Unmarshal([]byte(payload), &result)
147 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
148 policytypeid := (result["policy_type_id"].(float64))
149 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
151 a1.Logger.Error("Error : %+v", err1)
154 a1.Logger.Debug("instanceList ", instanceList)
155 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
156 for _, policyinstanceid := range instanceList {
157 policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
159 a1.Logger.Error("Error : %+v", err2)
162 a1.Logger.Debug("policyinstance ", policyinstance.(string))
164 rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
166 a1.Logger.Error("error : %v", err1)
169 a1.Logger.Debug("rmrMessage ", rmrMessage)
170 isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
172 a1.Logger.Debug("rmrSendToXapp : message sent")
174 a1.Logger.Error("rmrSendToXapp : message not sent")
178 case "A1_EI_QUERY_ALL":
179 a1.Logger.Debug("message recieved ", msg.Payload)
180 resp, err := http.Get(ecsEiTypePath)
182 a1.Logger.Error("Received error while fetching health info: %v", err)
184 if resp.StatusCode != http.StatusOK {
185 a1.Logger.Warning("Received no reponse from A1-EI service1")
187 a1.Logger.Debug("response from A1-EI service : ", resp)
189 defer resp.Body.Close()
190 respByte, err := ioutil.ReadAll(resp.Body)
193 a1.Logger.Debug("error in response: %+v", respByte)
196 a1.Logger.Debug("response : %+v", string(respByte))
198 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
200 a1.Logger.Debug("rmrSendToXapp : message sent")
202 a1.Logger.Error("rmrSendToXapp : message not sent")
204 case "A1_EI_CREATE_JOB":
205 payload := msg.Payload
206 a1.Logger.Debug("message recieved : %s", payload)
208 var result map[string]interface{}
210 err := json.Unmarshal([]byte(payload), &result)
212 a1.Logger.Error("Unmarshal error : %+v", err)
215 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
217 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
218 jsonReq, err := json.Marshal(result)
220 a1.Logger.Error("marshal error : %v", err)
224 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
225 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
227 a1.Logger.Error("http error : %v", err)
231 req.Header.Set("Content-Type", "application/json; charset=utf-8")
232 client := &http.Client{}
233 resp, err3 := client.Do(req)
235 a1.Logger.Error("error:", err3)
239 defer resp.Body.Close()
241 a1.Logger.Debug("response status : ", resp.StatusCode)
242 if resp.StatusCode == 200 || resp.StatusCode == 201 {
243 a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
244 rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
245 a1.Logger.Debug("rmr_Data to send: ", rmrData)
247 isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
249 a1.Logger.Debug("rmrSendToXapp : message sent")
251 a1.Logger.Error("rmrSendToXapp : message not sent")
254 a1.Logger.Warning("failed to create EIJOB ")
258 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
262 rmr.rmrclient.Free(msg.Mbuf)
268 func (rmr *RmrSender) RmrRecieveStart() {
269 a1.Logger.Debug("Inside RmrRecieveStart function ")
270 go rmr.rmrclient.Start(rmr)
271 a1.Logger.Debug("Reciever started")