4f4aa41d7737dda00cc0f0d2618791ab307a97a8
[ric-plt/a1.git] / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "bytes"
26         "encoding/json"
27         "fmt"
28         "io/ioutil"
29         "net/http"
30         "strconv"
31
32         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
33         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
34         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 )
37
38 const (
39         a1SourceName      = "service-ricplt-a1mediator-http"
40         a1PolicyRequest   = 20010
41         ecsServiceHost    = "http://ecs-service:8083"
42         ecsEiTypePath     = ecsServiceHost + "/A1-EI/v1/eitypes"
43         ecsEiJobPath      = ecsServiceHost + "/A1-EI/v1/eijobs/"
44         a1EiQueryAllResp  = 20014
45         a1EiCreateJobResp = 20016
46         jobCreationData   = `{"ei_job_id": %s.}`
47         DefaultSubId      = -1
48 )
49
50 type RmrSender struct {
51         rmrclient     *xapp.RMRClient
52         policyManager *policy.PolicyManager
53 }
54
55 type IRmrSender interface {
56         RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
57 }
58
59 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
60         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
61                 StatDesc: "",
62                 RmrData: xapp.PortData{
63                         //TODO: Read configuration from config file
64                         Name:              "",
65                         MaxSize:           65534,
66                         ThreadType:        0,
67                         LowLatency:        false,
68                         FastAck:           false,
69                         MaxRetryOnFailure: 1,
70                         Port:              4561,
71                 },
72         })
73
74         rmrsender := &RmrSender{
75                 rmrclient:     RMRclient,
76                 policyManager: policyManager,
77         }
78
79         rmrsender.RmrRecieveStart()
80         return rmrsender
81 }
82
83 var RICMessageTypes = map[string]int{
84         "A1_POLICY_REQ":         20010,
85         "A1_POLICY_RESP":        20011,
86         "A1_POLICY_QUERY":       20012,
87         "A1_EI_QUERY_ALL":       20013,
88         "AI_EI_QUERY_ALL_RESP":  20014,
89         "A1_EI_CREATE_JOB":      20015,
90         "A1_EI_CREATE_JOB_RESP": 20016,
91         "A1_EI_DATA_DELIVERY":   20017,
92 }
93
94 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
95         for k, v := range RICMessageTypes {
96                 if id == v {
97                         return k
98                 }
99         }
100         return
101 }
102
103 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
104
105         params := &xapp.RMRParams{}
106         params.Mtype = messagetype
107         params.SubId = subid
108         params.Xid = ""
109         params.Meid = &xapp.RMRMeid{}
110         params.Src = a1SourceName
111         params.PayloadLen = len([]byte(httpBodyString))
112         params.Payload = []byte(httpBodyString)
113         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
114         a1.Logger.Debug("len payload %+v", len(params.Payload))
115         s := rmr.rmrclient.SendMsg(params)
116         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
117         return s
118 }
119
120 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
121         a1.Logger.Debug("In the Consume function")
122         id := rmr.GetRicMessageName(msg.Mtype)
123         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
124
125         switch id {
126
127         case "A1_POLICY_RESP":
128                 a1.Logger.Debug("Recived policy responose")
129                 payload := msg.Payload
130                 a1.Logger.Debug("message recieved : %s", payload)
131                 var result map[string]interface{}
132                 err := json.Unmarshal([]byte(payload), &result)
133                 if err != nil {
134                         a1.Logger.Error("Unmarshal error : %+v", err)
135                         return err
136                 }
137                 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
138                 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
139         case "A1_POLICY_QUERY":
140                 a1.Logger.Debug("Recived policy query")
141                 a1.Logger.Debug("message recieved ", msg.Payload)
142                 payload := msg.Payload
143                 var result map[string]interface{}
144                 json.Unmarshal([]byte(payload), &result)
145                 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
146                 policytypeid := (result["policy_type_id"].(float64))
147                 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
148                 if err1 != nil {
149                         a1.Logger.Error("Error : %+v", err1)
150                         return err1
151                 }
152                 a1.Logger.Debug("instanceList ", instanceList)
153                 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
154                 for _, policyinstanceid := range instanceList {
155                         policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
156                         if err2 != nil {
157                                 a1.Logger.Error("Error : %+v", err2)
158                                 return err2
159                         }
160                         a1.Logger.Debug("policyinstance ", policyinstance.(string))
161                         message := Message{}
162                         rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
163                         if err1 != nil {
164                                 a1.Logger.Error("error : %v", err1)
165                                 return err1
166                         }
167                         a1.Logger.Debug("rmrMessage ", rmrMessage)
168                         isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
169                         if isSent {
170                                 a1.Logger.Debug("rmrSendToXapp : message sent")
171                         } else {
172                                 a1.Logger.Error("rmrSendToXapp : message not sent")
173                         }
174                 }
175
176         case "A1_EI_QUERY_ALL":
177                 a1.Logger.Debug("message recieved ", msg.Payload)
178                 resp, err := http.Get(ecsEiTypePath)
179                 if err != nil {
180                         a1.Logger.Error("Received error while fetching health info: %v", err)
181                 }
182                 if resp.StatusCode != http.StatusOK {
183                         a1.Logger.Warning("Received no reponse from A1-EI service1")
184                 }
185                 a1.Logger.Debug("response from A1-EI service : ", resp)
186
187                 defer resp.Body.Close()
188                 respByte, err := ioutil.ReadAll(resp.Body)
189
190                 if err != nil {
191                         a1.Logger.Debug("error in response: %+v", respByte)
192                 }
193
194                 a1.Logger.Debug("response : %+v", string(respByte))
195
196                 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
197                 if isSent {
198                         a1.Logger.Debug("rmrSendToXapp : message sent")
199                 } else {
200                         a1.Logger.Error("rmrSendToXapp : message not sent")
201                 }
202         case "A1_EI_CREATE_JOB":
203                 payload := msg.Payload
204                 a1.Logger.Debug("message recieved : %s", payload)
205
206                 var result map[string]interface{}
207
208                 err := json.Unmarshal([]byte(payload), &result)
209                 if err != nil {
210                         a1.Logger.Error("Unmarshal error : %+v", err)
211                         return err
212                 }
213                 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
214
215                 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
216                 jsonReq, err := json.Marshal(result)
217                 if err != nil {
218                         a1.Logger.Error("marshal error : %v", err)
219                         return err
220                 }
221
222                 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
223                 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
224                 if err != nil {
225                         a1.Logger.Error("http error : %v", err)
226                         return err
227                 }
228
229                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
230                 client := &http.Client{}
231                 resp, err3 := client.Do(req)
232                 if err3 != nil {
233                         a1.Logger.Error("error:", err3)
234                         return err
235                 }
236
237                 defer resp.Body.Close()
238
239                 a1.Logger.Debug("response status : ", resp.StatusCode)
240                 if resp.StatusCode == 200 || resp.StatusCode == 201 {
241                         a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
242                         rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
243                         a1.Logger.Debug("rmr_Data to send: ", rmrData)
244
245                         isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
246                         if isSent {
247                                 a1.Logger.Debug("rmrSendToXapp : message sent")
248                         } else {
249                                 a1.Logger.Error("rmrSendToXapp : message not sent")
250                         }
251                 } else {
252                         a1.Logger.Warning("failed to create EIJOB ")
253                 }
254
255         default:
256                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
257         }
258
259         defer func() {
260                 rmr.rmrclient.Free(msg.Mbuf)
261                 msg.Mbuf = nil
262         }()
263         return
264 }
265
266 func (rmr *RmrSender) RmrRecieveStart() {
267         a1.Logger.Debug("Inside RmrRecieveStart function ")
268         go rmr.rmrclient.Start(rmr)
269         a1.Logger.Debug("Reciever started")
270 }