2 ==================================================================================
3 Copyright (c) 2022 Samsung
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 platform project (RICP).
19 ==================================================================================
32 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
33 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
34 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
35 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 a1SourceName = "service-ricplt-a1mediator-http"
40 a1PolicyRequest = 20010
41 ecsServiceHost = "http://ecs-service:8083"
42 ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes"
43 ecsEiJobPath = ecsServiceHost + "/A1-EI/v1/eijobs/"
44 a1EiQueryAllResp = 20014
45 a1EiCreateJobResp = 20016
46 jobCreationData = `{"ei_job_id": %s.}`
49 type RmrSender struct {
50 rmrclient *xapp.RMRClient
51 policyManager *policy.PolicyManager
54 type IRmrSender interface {
55 RmrSendToXapp(httpBodyString string, messagetype int) bool
58 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
59 RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
61 RmrData: xapp.PortData{
62 //TODO: Read configuration from config file
73 rmrsender := &RmrSender{
75 policyManager: policyManager,
78 rmrsender.RmrRecieveStart()
82 var RICMessageTypes = map[string]int{
83 "A1_POLICY_REQ": 20010,
84 "A1_POLICY_RESP": 20011,
85 "A1_POLICY_QUERY": 20012,
86 "A1_EI_QUERY_ALL": 20013,
87 "AI_EI_QUERY_ALL_RESP": 20014,
88 "A1_EI_CREATE_JOB": 20015,
89 "A1_EI_CREATE_JOB_RESP": 20016,
90 "A1_EI_DATA_DELIVERY": 20017,
93 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
94 for k, v := range RICMessageTypes {
102 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
104 params := &xapp.RMRParams{}
105 params.Mtype = messagetype
108 params.Meid = &xapp.RMRMeid{}
109 params.Src = a1SourceName
110 params.PayloadLen = len([]byte(httpBodyString))
111 params.Payload = []byte(httpBodyString)
112 a1.Logger.Debug("MSG to XAPP: %s ", params.String())
113 a1.Logger.Debug("len payload %+v", len(params.Payload))
114 s := rmr.rmrclient.SendMsg(params)
115 a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
119 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
120 a1.Logger.Debug("In the Consume function")
121 id := rmr.GetRicMessageName(msg.Mtype)
122 a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
126 case "A1_POLICY_RESP":
127 a1.Logger.Debug("Recived policy responose")
128 payload := msg.Payload
129 a1.Logger.Debug("message recieved : %s", payload)
130 var result map[string]interface{}
131 err := json.Unmarshal([]byte(payload), &result)
133 a1.Logger.Error("Unmarshal error : %+v", err)
136 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
137 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
138 case "A1_POLICY_QUERY":
139 a1.Logger.Debug("Recived policy query")
140 a1.Logger.Debug("message recieved ", msg.Payload)
141 payload := msg.Payload
142 var result map[string]interface{}
143 json.Unmarshal([]byte(payload), &result)
144 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
145 policytypeid := (result["policy_type_id"].(float64))
146 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
148 a1.Logger.Error("Error : %+v", err1)
151 a1.Logger.Debug("instanceList ", instanceList)
152 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
153 for _, policyinstanceid := range instanceList {
154 policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
156 a1.Logger.Error("Error : %+v", err2)
159 a1.Logger.Debug("policyinstance ", policyinstance.(string))
161 rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
163 a1.Logger.Error("error : %v", err1)
166 a1.Logger.Debug("rmrMessage ", rmrMessage)
167 isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
169 a1.Logger.Debug("rmrSendToXapp : message sent")
171 a1.Logger.Error("rmrSendToXapp : message not sent")
175 case "A1_EI_QUERY_ALL":
176 a1.Logger.Debug("message recieved ", msg.Payload)
177 resp, err := http.Get(ecsEiTypePath)
179 a1.Logger.Error("Received error while fetching health info: %v", err)
181 if resp.StatusCode != http.StatusOK {
182 a1.Logger.Warning("Received no reponse from A1-EI service1")
184 a1.Logger.Debug("response from A1-EI service : ", resp)
186 defer resp.Body.Close()
187 respByte, err := ioutil.ReadAll(resp.Body)
190 a1.Logger.Debug("error in response: %+v", respByte)
193 a1.Logger.Debug("response : %+v", string(respByte))
195 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
197 a1.Logger.Debug("rmrSendToXapp : message sent")
199 a1.Logger.Error("rmrSendToXapp : message not sent")
201 case "A1_EI_CREATE_JOB":
202 payload := msg.Payload
203 a1.Logger.Debug("message recieved : %s", payload)
205 var result map[string]interface{}
207 err := json.Unmarshal([]byte(payload), &result)
209 a1.Logger.Error("Unmarshal error : %+v", err)
212 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
214 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
215 jsonReq, err := json.Marshal(result)
217 a1.Logger.Error("marshal error : %v", err)
221 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
222 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
224 a1.Logger.Error("http error : %v", err)
228 req.Header.Set("Content-Type", "application/json; charset=utf-8")
229 client := &http.Client{}
230 resp, err3 := client.Do(req)
232 a1.Logger.Error("error:", err3)
236 defer resp.Body.Close()
238 a1.Logger.Debug("response status : ", resp.StatusCode)
239 if resp.StatusCode == 200 || resp.StatusCode == 201 {
240 a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
241 rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
242 a1.Logger.Debug("rmr_Data to send: ", rmrData)
244 isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp)
246 a1.Logger.Debug("rmrSendToXapp : message sent")
248 a1.Logger.Error("rmrSendToXapp : message not sent")
251 a1.Logger.Warning("failed to create EIJOB ")
255 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
259 rmr.rmrclient.Free(msg.Mbuf)
265 func (rmr *RmrSender) RmrRecieveStart() {
266 a1.Logger.Debug("Inside RmrRecieveStart function ")
267 go rmr.rmrclient.Start(rmr)
268 a1.Logger.Debug("Reciever started")