Reading Rmr Configuration
[ric-plt/a1.git] / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "bytes"
26         "encoding/json"
27         "fmt"
28         "io/ioutil"
29         "net/http"
30         "strconv"
31
32         "gerrit.o-ran-sc.org/r/ric-plt/a1/config"
33         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
34         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
35         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
36         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
37 )
38
39 const (
40         a1SourceName      = "service-ricplt-a1mediator-http"
41         a1PolicyRequest   = 20010
42         ecsServiceHost    = "http://ecs-service:8083"
43         ecsEiTypePath     = ecsServiceHost + "/A1-EI/v1/eitypes"
44         ecsEiJobPath      = ecsServiceHost + "/A1-EI/v1/eijobs/"
45         a1EiQueryAllResp  = 20014
46         a1EiCreateJobResp = 20016
47         jobCreationData   = `{"ei_job_id": %s.}`
48         DefaultSubId      = -1
49 )
50
51 type RmrSender struct {
52         rmrclient     *xapp.RMRClient
53         policyManager *policy.PolicyManager
54 }
55
56 type IRmrSender interface {
57         RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
58 }
59
60 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
61         config := config.ParseConfiguration()
62         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
63                 StatDesc: "",
64                 RmrData: xapp.PortData{
65
66                         Name:              config.Name,
67                         MaxSize:           config.MaxSize,
68                         ThreadType:        config.ThreadType,
69                         LowLatency:        config.LowLatency,
70                         FastAck:           config.FastAck,
71                         MaxRetryOnFailure: config.MaxRetryOnFailure,
72                         Port:              config.Port,
73                 },
74         })
75
76         rmrsender := &RmrSender{
77                 rmrclient:     RMRclient,
78                 policyManager: policyManager,
79         }
80
81         rmrsender.RmrRecieveStart()
82         return rmrsender
83 }
84
85 var RICMessageTypes = map[string]int{
86         "A1_POLICY_REQ":         20010,
87         "A1_POLICY_RESP":        20011,
88         "A1_POLICY_QUERY":       20012,
89         "A1_EI_QUERY_ALL":       20013,
90         "AI_EI_QUERY_ALL_RESP":  20014,
91         "A1_EI_CREATE_JOB":      20015,
92         "A1_EI_CREATE_JOB_RESP": 20016,
93         "A1_EI_DATA_DELIVERY":   20017,
94 }
95
96 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
97         for k, v := range RICMessageTypes {
98                 if id == v {
99                         return k
100                 }
101         }
102         return
103 }
104
105 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
106
107         params := &xapp.RMRParams{}
108         params.Mtype = messagetype
109         params.SubId = subid
110         params.Xid = ""
111         params.Meid = &xapp.RMRMeid{}
112         params.Src = a1SourceName
113         params.PayloadLen = len([]byte(httpBodyString))
114         params.Payload = []byte(httpBodyString)
115         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
116         a1.Logger.Debug("len payload %+v", len(params.Payload))
117         s := rmr.rmrclient.SendMsg(params)
118         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
119         return s
120 }
121
122 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
123         a1.Logger.Debug("In the Consume function")
124         id := rmr.GetRicMessageName(msg.Mtype)
125         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
126
127         switch id {
128
129         case "A1_POLICY_RESP":
130                 a1.Logger.Debug("Recived policy responose")
131                 payload := msg.Payload
132                 a1.Logger.Debug("message recieved : %s", payload)
133                 var result map[string]interface{}
134                 err := json.Unmarshal([]byte(payload), &result)
135                 if err != nil {
136                         a1.Logger.Error("Unmarshal error : %+v", err)
137                         return err
138                 }
139                 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
140                 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
141         case "A1_POLICY_QUERY":
142                 a1.Logger.Debug("Recived policy query")
143                 a1.Logger.Debug("message recieved ", msg.Payload)
144                 payload := msg.Payload
145                 var result map[string]interface{}
146                 json.Unmarshal([]byte(payload), &result)
147                 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
148                 policytypeid := (result["policy_type_id"].(float64))
149                 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
150                 if err1 != nil {
151                         a1.Logger.Error("Error : %+v", err1)
152                         return err1
153                 }
154                 a1.Logger.Debug("instanceList ", instanceList)
155                 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
156                 for _, policyinstanceid := range instanceList {
157                         policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
158                         if err2 != nil {
159                                 a1.Logger.Error("Error : %+v", err2)
160                                 return err2
161                         }
162                         a1.Logger.Debug("policyinstance ", policyinstance.(string))
163                         message := Message{}
164                         rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
165                         if err1 != nil {
166                                 a1.Logger.Error("error : %v", err1)
167                                 return err1
168                         }
169                         a1.Logger.Debug("rmrMessage ", rmrMessage)
170                         isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
171                         if isSent {
172                                 a1.Logger.Debug("rmrSendToXapp : message sent")
173                         } else {
174                                 a1.Logger.Error("rmrSendToXapp : message not sent")
175                         }
176                 }
177
178         case "A1_EI_QUERY_ALL":
179                 a1.Logger.Debug("message recieved ", msg.Payload)
180                 resp, err := http.Get(ecsEiTypePath)
181                 if err != nil {
182                         a1.Logger.Error("Received error while fetching health info: %v", err)
183                 }
184                 if resp.StatusCode != http.StatusOK {
185                         a1.Logger.Warning("Received no reponse from A1-EI service1")
186                 }
187                 a1.Logger.Debug("response from A1-EI service : ", resp)
188
189                 defer resp.Body.Close()
190                 respByte, err := ioutil.ReadAll(resp.Body)
191
192                 if err != nil {
193                         a1.Logger.Debug("error in response: %+v", respByte)
194                 }
195
196                 a1.Logger.Debug("response : %+v", string(respByte))
197
198                 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
199                 if isSent {
200                         a1.Logger.Debug("rmrSendToXapp : message sent")
201                 } else {
202                         a1.Logger.Error("rmrSendToXapp : message not sent")
203                 }
204         case "A1_EI_CREATE_JOB":
205                 payload := msg.Payload
206                 a1.Logger.Debug("message recieved : %s", payload)
207
208                 var result map[string]interface{}
209
210                 err := json.Unmarshal([]byte(payload), &result)
211                 if err != nil {
212                         a1.Logger.Error("Unmarshal error : %+v", err)
213                         return err
214                 }
215                 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
216
217                 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
218                 jsonReq, err := json.Marshal(result)
219                 if err != nil {
220                         a1.Logger.Error("marshal error : %v", err)
221                         return err
222                 }
223
224                 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
225                 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
226                 if err != nil {
227                         a1.Logger.Error("http error : %v", err)
228                         return err
229                 }
230
231                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
232                 client := &http.Client{}
233                 resp, err3 := client.Do(req)
234                 if err3 != nil {
235                         a1.Logger.Error("error:", err3)
236                         return err
237                 }
238
239                 defer resp.Body.Close()
240
241                 a1.Logger.Debug("response status : ", resp.StatusCode)
242                 if resp.StatusCode == 200 || resp.StatusCode == 201 {
243                         a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
244                         rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
245                         a1.Logger.Debug("rmr_Data to send: ", rmrData)
246
247                         isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
248                         if isSent {
249                                 a1.Logger.Debug("rmrSendToXapp : message sent")
250                         } else {
251                                 a1.Logger.Error("rmrSendToXapp : message not sent")
252                         }
253                 } else {
254                         a1.Logger.Warning("failed to create EIJOB ")
255                 }
256
257         default:
258                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
259         }
260
261         defer func() {
262                 rmr.rmrclient.Free(msg.Mbuf)
263                 msg.Mbuf = nil
264         }()
265         return
266 }
267
268 func (rmr *RmrSender) RmrRecieveStart() {
269         a1.Logger.Debug("Inside RmrRecieveStart function ")
270         go rmr.rmrclient.Start(rmr)
271         a1.Logger.Debug("Reciever started")
272 }