2 ==================================================================================
3 Copyright (c) 2022 Samsung
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 platform project (RICP).
19 ==================================================================================
32 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
33 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
34 "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
35 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
39 a1SourceName = "service-ricplt-a1mediator-http"
40 a1PolicyRequest = 20010
41 ecsServiceHost = "http://ecs-service:8083"
42 ecsEiTypePath = ecsServiceHost + "/A1-EI/v1/eitypes"
43 ecsEiJobPath = ecsServiceHost + "/A1-EI/v1/eijobs/"
44 a1EiQueryAllResp = 20014
45 a1EiCreateJobResp = 20016
46 jobCreationData = `{"ei_job_id": %s.}`
50 type RmrSender struct {
51 rmrclient *xapp.RMRClient
52 policyManager *policy.PolicyManager
55 type IRmrSender interface {
56 RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
59 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
60 RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
62 RmrData: xapp.PortData{
63 //TODO: Read configuration from config file
74 rmrsender := &RmrSender{
76 policyManager: policyManager,
79 rmrsender.RmrRecieveStart()
83 var RICMessageTypes = map[string]int{
84 "A1_POLICY_REQ": 20010,
85 "A1_POLICY_RESP": 20011,
86 "A1_POLICY_QUERY": 20012,
87 "A1_EI_QUERY_ALL": 20013,
88 "AI_EI_QUERY_ALL_RESP": 20014,
89 "A1_EI_CREATE_JOB": 20015,
90 "A1_EI_CREATE_JOB_RESP": 20016,
91 "A1_EI_DATA_DELIVERY": 20017,
94 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
95 for k, v := range RICMessageTypes {
103 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
105 params := &xapp.RMRParams{}
106 params.Mtype = messagetype
109 params.Meid = &xapp.RMRMeid{}
110 params.Src = a1SourceName
111 params.PayloadLen = len([]byte(httpBodyString))
112 params.Payload = []byte(httpBodyString)
113 a1.Logger.Debug("MSG to XAPP: %s ", params.String())
114 a1.Logger.Debug("len payload %+v", len(params.Payload))
115 s := rmr.rmrclient.SendMsg(params)
116 a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
120 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
121 a1.Logger.Debug("In the Consume function")
122 id := rmr.GetRicMessageName(msg.Mtype)
123 a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
127 case "A1_POLICY_RESP":
128 a1.Logger.Debug("Recived policy responose")
129 payload := msg.Payload
130 a1.Logger.Debug("message recieved : %s", payload)
131 var result map[string]interface{}
132 err := json.Unmarshal([]byte(payload), &result)
134 a1.Logger.Error("Unmarshal error : %+v", err)
137 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
138 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
139 case "A1_POLICY_QUERY":
140 a1.Logger.Debug("Recived policy query")
141 a1.Logger.Debug("message recieved ", msg.Payload)
142 payload := msg.Payload
143 var result map[string]interface{}
144 json.Unmarshal([]byte(payload), &result)
145 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
146 policytypeid := (result["policy_type_id"].(float64))
147 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
149 a1.Logger.Error("Error : %+v", err1)
152 a1.Logger.Debug("instanceList ", instanceList)
153 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
154 for _, policyinstanceid := range instanceList {
155 policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
157 a1.Logger.Error("Error : %+v", err2)
160 a1.Logger.Debug("policyinstance ", policyinstance.(string))
162 rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
164 a1.Logger.Error("error : %v", err1)
167 a1.Logger.Debug("rmrMessage ", rmrMessage)
168 isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
170 a1.Logger.Debug("rmrSendToXapp : message sent")
172 a1.Logger.Error("rmrSendToXapp : message not sent")
176 case "A1_EI_QUERY_ALL":
177 a1.Logger.Debug("message recieved ", msg.Payload)
178 resp, err := http.Get(ecsEiTypePath)
180 a1.Logger.Error("Received error while fetching health info: %v", err)
182 if resp.StatusCode != http.StatusOK {
183 a1.Logger.Warning("Received no reponse from A1-EI service1")
185 a1.Logger.Debug("response from A1-EI service : ", resp)
187 defer resp.Body.Close()
188 respByte, err := ioutil.ReadAll(resp.Body)
191 a1.Logger.Debug("error in response: %+v", respByte)
194 a1.Logger.Debug("response : %+v", string(respByte))
196 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
198 a1.Logger.Debug("rmrSendToXapp : message sent")
200 a1.Logger.Error("rmrSendToXapp : message not sent")
202 case "A1_EI_CREATE_JOB":
203 payload := msg.Payload
204 a1.Logger.Debug("message recieved : %s", payload)
206 var result map[string]interface{}
208 err := json.Unmarshal([]byte(payload), &result)
210 a1.Logger.Error("Unmarshal error : %+v", err)
213 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
215 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
216 jsonReq, err := json.Marshal(result)
218 a1.Logger.Error("marshal error : %v", err)
222 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
223 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
225 a1.Logger.Error("http error : %v", err)
229 req.Header.Set("Content-Type", "application/json; charset=utf-8")
230 client := &http.Client{}
231 resp, err3 := client.Do(req)
233 a1.Logger.Error("error:", err3)
237 defer resp.Body.Close()
239 a1.Logger.Debug("response status : ", resp.StatusCode)
240 if resp.StatusCode == 200 || resp.StatusCode == 201 {
241 a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
242 rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
243 a1.Logger.Debug("rmr_Data to send: ", rmrData)
245 isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
247 a1.Logger.Debug("rmrSendToXapp : message sent")
249 a1.Logger.Error("rmrSendToXapp : message not sent")
252 a1.Logger.Warning("failed to create EIJOB ")
256 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
260 rmr.rmrclient.Free(msg.Mbuf)
266 func (rmr *RmrSender) RmrRecieveStart() {
267 a1.Logger.Debug("Inside RmrRecieveStart function ")
268 go rmr.rmrclient.Start(rmr)
269 a1.Logger.Debug("Reciever started")