Moving RMR message reciver into go routine
[ric-plt/a1.git] / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "bytes"
26         "encoding/json"
27         "fmt"
28         "io/ioutil"
29         "net/http"
30         "strconv"
31
32         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
33         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
34         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 )
37
38 const (
39         a1SourceName      = "service-ricplt-a1mediator-http"
40         a1PolicyRequest   = 20010
41         ecsServiceHost    = "http://ecs-service:8083"
42         ecsEiTypePath     = ecsServiceHost + "/A1-EI/v1/eitypes"
43         ecsEiJobPath      = ecsServiceHost + "/A1-EI/v1/eijobs/"
44         a1EiQueryAllResp  = 20014
45         a1EiCreateJobResp = 20016
46         jobCreationData   = `{"ei_job_id": %s.}`
47 )
48
49 type RmrSender struct {
50         rmrclient     *xapp.RMRClient
51         policyManager *policy.PolicyManager
52 }
53
54 type IRmrSender interface {
55         RmrSendToXapp(httpBodyString string, messagetype int) bool
56 }
57
58 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
59         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
60                 StatDesc: "",
61                 RmrData: xapp.PortData{
62                         //TODO: Read configuration from config file
63                         Name:              "",
64                         MaxSize:           65534,
65                         ThreadType:        0,
66                         LowLatency:        false,
67                         FastAck:           false,
68                         MaxRetryOnFailure: 1,
69                         Port:              4561,
70                 },
71         })
72
73         rmrsender := &RmrSender{
74                 rmrclient:     RMRclient,
75                 policyManager: policyManager,
76         }
77
78         rmrsender.RmrRecieveStart()
79         return rmrsender
80 }
81
82 var RICMessageTypes = map[string]int{
83         "A1_POLICY_REQ":         20010,
84         "A1_POLICY_RESP":        20011,
85         "A1_POLICY_QUERY":       20012,
86         "A1_EI_QUERY_ALL":       20013,
87         "AI_EI_QUERY_ALL_RESP":  20014,
88         "A1_EI_CREATE_JOB":      20015,
89         "A1_EI_CREATE_JOB_RESP": 20016,
90         "A1_EI_DATA_DELIVERY":   20017,
91 }
92
93 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
94         for k, v := range RICMessageTypes {
95                 if id == v {
96                         return k
97                 }
98         }
99         return
100 }
101
102 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
103
104         params := &xapp.RMRParams{}
105         params.Mtype = messagetype
106         params.SubId = -1
107         params.Xid = ""
108         params.Meid = &xapp.RMRMeid{}
109         params.Src = a1SourceName
110         params.PayloadLen = len([]byte(httpBodyString))
111         params.Payload = []byte(httpBodyString)
112         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
113         a1.Logger.Debug("len payload %+v", len(params.Payload))
114         s := rmr.rmrclient.SendMsg(params)
115         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
116         return s
117 }
118
119 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
120         a1.Logger.Debug("In the Consume function")
121         id := rmr.GetRicMessageName(msg.Mtype)
122         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
123
124         switch id {
125
126         case "A1_POLICY_RESP":
127                 a1.Logger.Debug("Recived policy responose")
128                 payload := msg.Payload
129                 a1.Logger.Debug("message recieved : %s", payload)
130                 var result map[string]interface{}
131                 err := json.Unmarshal([]byte(payload), &result)
132                 if err != nil {
133                         a1.Logger.Error("Unmarshal error : %+v", err)
134                         return err
135                 }
136                 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
137                 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
138         case "A1_POLICY_QUERY":
139                 a1.Logger.Debug("Recived policy query")
140                 a1.Logger.Debug("message recieved ", msg.Payload)
141                 payload := msg.Payload
142                 var result map[string]interface{}
143                 json.Unmarshal([]byte(payload), &result)
144                 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
145                 policytypeid := (result["policy_type_id"].(float64))
146                 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
147                 if err1 != nil {
148                         a1.Logger.Error("Error : %+v", err1)
149                         return err1
150                 }
151                 a1.Logger.Debug("instanceList ", instanceList)
152                 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
153                 for _, policyinstanceid := range instanceList {
154                         policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
155                         if err2 != nil {
156                                 a1.Logger.Error("Error : %+v", err2)
157                                 return err2
158                         }
159                         a1.Logger.Debug("policyinstance ", policyinstance.(string))
160                         message := Message{}
161                         rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
162                         if err1 != nil {
163                                 a1.Logger.Error("error : %v", err1)
164                                 return err1
165                         }
166                         a1.Logger.Debug("rmrMessage ", rmrMessage)
167                         isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
168                         if isSent {
169                                 a1.Logger.Debug("rmrSendToXapp : message sent")
170                         } else {
171                                 a1.Logger.Error("rmrSendToXapp : message not sent")
172                         }
173                 }
174
175         case "A1_EI_QUERY_ALL":
176                 a1.Logger.Debug("message recieved ", msg.Payload)
177                 resp, err := http.Get(ecsEiTypePath)
178                 if err != nil {
179                         a1.Logger.Error("Received error while fetching health info: %v", err)
180                 }
181                 if resp.StatusCode != http.StatusOK {
182                         a1.Logger.Warning("Received no reponse from A1-EI service1")
183                 }
184                 a1.Logger.Debug("response from A1-EI service : ", resp)
185
186                 defer resp.Body.Close()
187                 respByte, err := ioutil.ReadAll(resp.Body)
188
189                 if err != nil {
190                         a1.Logger.Debug("error in response: %+v", respByte)
191                 }
192
193                 a1.Logger.Debug("response : %+v", string(respByte))
194
195                 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
196                 if isSent {
197                         a1.Logger.Debug("rmrSendToXapp : message sent")
198                 } else {
199                         a1.Logger.Error("rmrSendToXapp : message not sent")
200                 }
201         case "A1_EI_CREATE_JOB":
202                 payload := msg.Payload
203                 a1.Logger.Debug("message recieved : %s", payload)
204
205                 var result map[string]interface{}
206
207                 err := json.Unmarshal([]byte(payload), &result)
208                 if err != nil {
209                         a1.Logger.Error("Unmarshal error : %+v", err)
210                         return err
211                 }
212                 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
213
214                 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
215                 jsonReq, err := json.Marshal(result)
216                 if err != nil {
217                         a1.Logger.Error("marshal error : %v", err)
218                         return err
219                 }
220
221                 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
222                 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
223                 if err != nil {
224                         a1.Logger.Error("http error : %v", err)
225                         return err
226                 }
227
228                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
229                 client := &http.Client{}
230                 resp, err3 := client.Do(req)
231                 if err3 != nil {
232                         a1.Logger.Error("error:", err3)
233                         return err
234                 }
235
236                 defer resp.Body.Close()
237
238                 a1.Logger.Debug("response status : ", resp.StatusCode)
239                 if resp.StatusCode == 200 || resp.StatusCode == 201 {
240                         a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
241                         rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
242                         a1.Logger.Debug("rmr_Data to send: ", rmrData)
243
244                         isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp)
245                         if isSent {
246                                 a1.Logger.Debug("rmrSendToXapp : message sent")
247                         } else {
248                                 a1.Logger.Error("rmrSendToXapp : message not sent")
249                         }
250                 } else {
251                         a1.Logger.Warning("failed to create EIJOB ")
252                 }
253
254         default:
255                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
256         }
257
258         defer func() {
259                 rmr.rmrclient.Free(msg.Mbuf)
260                 msg.Mbuf = nil
261         }()
262         return
263 }
264
265 func (rmr *RmrSender) RmrRecieveStart() {
266         a1.Logger.Debug("Inside RmrRecieveStart function ")
267         go rmr.rmrclient.Start(rmr)
268         a1.Logger.Debug("Reciever started")
269 }