Policy status notification handling - initial rollup
[ric-plt/a1.git] / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "bytes"
26         "encoding/json"
27         "fmt"
28         "io/ioutil"
29         "net/http"
30         "strconv"
31
32         "gerrit.o-ran-sc.org/r/ric-plt/a1/config"
33         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
34         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
35         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
36         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
37 )
38
39 const (
40         a1SourceName      = "service-ricplt-a1mediator-http"
41         a1PolicyRequest   = 20010
42         ecsServiceHost    = "http://ecs-service:8083"
43         ecsEiTypePath     = ecsServiceHost + "/A1-EI/v1/eitypes"
44         ecsEiJobPath      = ecsServiceHost + "/A1-EI/v1/eijobs/"
45         a1EiQueryAllResp  = 20014
46         a1EiCreateJobResp = 20016
47         jobCreationData   = `{"ei_job_id": %s.}`
48         DefaultSubId      = -1
49 )
50
51 type RmrSender struct {
52         rmrclient     *xapp.RMRClient
53         policyManager *policy.PolicyManager
54 }
55
56 type IRmrSender interface {
57         RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool
58 }
59
60 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
61         config := config.ParseConfiguration()
62         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
63                 StatDesc: "",
64                 RmrData: xapp.PortData{
65
66                         Name:              config.Name,
67                         MaxSize:           config.MaxSize,
68                         ThreadType:        config.ThreadType,
69                         LowLatency:        config.LowLatency,
70                         FastAck:           config.FastAck,
71                         MaxRetryOnFailure: config.MaxRetryOnFailure,
72                         Port:              config.Port,
73                 },
74         })
75
76         rmrsender := &RmrSender{
77                 rmrclient:     RMRclient,
78                 policyManager: policyManager,
79         }
80
81         rmrsender.RmrRecieveStart()
82         return rmrsender
83 }
84
85 var RICMessageTypes = map[string]int{
86         "A1_POLICY_REQ":         20010,
87         "A1_POLICY_RESP":        20011,
88         "A1_POLICY_QUERY":       20012,
89         "A1_EI_QUERY_ALL":       20013,
90         "AI_EI_QUERY_ALL_RESP":  20014,
91         "A1_EI_CREATE_JOB":      20015,
92         "A1_EI_CREATE_JOB_RESP": 20016,
93         "A1_EI_DATA_DELIVERY":   20017,
94 }
95
96 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
97         for k, v := range RICMessageTypes {
98                 if id == v {
99                         return k
100                 }
101         }
102         return
103 }
104
105 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int, subid int) bool {
106
107         params := &xapp.RMRParams{}
108         params.Mtype = messagetype
109         params.SubId = subid
110         params.Xid = ""
111         params.Meid = &xapp.RMRMeid{}
112         params.Src = a1SourceName
113         params.PayloadLen = len([]byte(httpBodyString))
114         params.Payload = []byte(httpBodyString)
115         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
116         a1.Logger.Debug("len payload %+v", len(params.Payload))
117         s := rmr.rmrclient.SendMsg(params)
118         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
119         return s
120 }
121
122 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
123         a1.Logger.Debug("In the Consume function")
124         id := rmr.GetRicMessageName(msg.Mtype)
125         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
126
127         switch id {
128
129         case "A1_POLICY_RESP":
130                 a1.Logger.Debug("Recived policy responose")
131                 payload := msg.Payload
132                 a1.Logger.Debug("message recieved : %s", payload)
133                 var result map[string]interface{}
134                 err := json.Unmarshal([]byte(payload), &result)
135                 if err != nil {
136                         a1.Logger.Error("Unmarshal error : %+v", err)
137                         return err
138                 }
139                 policyTypeId := int(result["policy_type_id"].(float64))
140                 policyInstanceId := result["policy_instance_id"].(string)
141                 policyHandlerId := result["handler_id"].(string)
142                 policyStatus := result["status"].(string)
143
144                 a1.Logger.Debug("message recieved for %d and %s with status : %s", policyTypeId, policyInstanceId, policyStatus)
145                 rmr.policyManager.SetPolicyInstanceStatus(policyTypeId, policyInstanceId, policyStatus)
146                 err = rmr.policyManager.SendPolicyStatusNotification(policyTypeId, policyInstanceId, policyHandlerId, policyStatus)
147                 if err != nil {
148                         a1.Logger.Debug("failed to send policy status notification %v+", err)
149                 }
150
151         case "A1_POLICY_QUERY":
152                 a1.Logger.Debug("Recived policy query")
153                 a1.Logger.Debug("message recieved ", msg.Payload)
154                 payload := msg.Payload
155                 var result map[string]interface{}
156                 json.Unmarshal([]byte(payload), &result)
157                 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
158                 policytypeid := (result["policy_type_id"].(float64))
159                 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
160                 if err1 != nil {
161                         a1.Logger.Error("Error : %+v", err1)
162                         return err1
163                 }
164                 a1.Logger.Debug("instanceList ", instanceList)
165                 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
166                 for _, policyinstanceid := range instanceList {
167                         policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
168                         if err2 != nil {
169                                 a1.Logger.Error("Error : %+v", err2)
170                                 return err2
171                         }
172                         a1.Logger.Debug("policyinstance ", policyinstance.(string))
173                         message := Message{}
174                         rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
175                         if err1 != nil {
176                                 a1.Logger.Error("error : %v", err1)
177                                 return err1
178                         }
179                         a1.Logger.Debug("rmrMessage ", rmrMessage)
180                         isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest, int(policytypeid))
181                         if isSent {
182                                 a1.Logger.Debug("rmrSendToXapp : message sent")
183                         } else {
184                                 a1.Logger.Error("rmrSendToXapp : message not sent")
185                         }
186                 }
187
188         case "A1_EI_QUERY_ALL":
189                 a1.Logger.Debug("message recieved ", msg.Payload)
190                 resp, err := http.Get(ecsEiTypePath)
191                 if err != nil {
192                         a1.Logger.Error("Received error while fetching health info: %v", err)
193                 }
194                 if resp.StatusCode != http.StatusOK {
195                         a1.Logger.Warning("Received no reponse from A1-EI service1")
196                 }
197                 a1.Logger.Debug("response from A1-EI service : ", resp)
198
199                 defer resp.Body.Close()
200                 respByte, err := ioutil.ReadAll(resp.Body)
201
202                 if err != nil {
203                         a1.Logger.Debug("error in response: %+v", respByte)
204                 }
205
206                 a1.Logger.Debug("response : %+v", string(respByte))
207
208                 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp, DefaultSubId)
209                 if isSent {
210                         a1.Logger.Debug("rmrSendToXapp : message sent")
211                 } else {
212                         a1.Logger.Error("rmrSendToXapp : message not sent")
213                 }
214         case "A1_EI_CREATE_JOB":
215                 payload := msg.Payload
216                 a1.Logger.Debug("message recieved : %s", payload)
217
218                 var result map[string]interface{}
219
220                 err := json.Unmarshal([]byte(payload), &result)
221                 if err != nil {
222                         a1.Logger.Error("Unmarshal error : %+v", err)
223                         return err
224                 }
225                 a1.Logger.Debug("Unmarshaled message recieved : %s ", result)
226
227                 jobIdStr := strconv.FormatInt((int64(result["job-id"].(float64))), 10)
228                 jsonReq, err := json.Marshal(result)
229                 if err != nil {
230                         a1.Logger.Error("marshal error : %v", err)
231                         return err
232                 }
233
234                 a1.Logger.Debug("url to send to :", ecsEiJobPath+jobIdStr)
235                 req, err := http.NewRequest(http.MethodPut, ecsEiJobPath+jobIdStr, bytes.NewBuffer(jsonReq))
236                 if err != nil {
237                         a1.Logger.Error("http error : %v", err)
238                         return err
239                 }
240
241                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
242                 client := &http.Client{}
243                 resp, err3 := client.Do(req)
244                 if err3 != nil {
245                         a1.Logger.Error("error:", err3)
246                         return err
247                 }
248
249                 defer resp.Body.Close()
250
251                 a1.Logger.Debug("response status : ", resp.StatusCode)
252                 if resp.StatusCode == 200 || resp.StatusCode == 201 {
253                         a1.Logger.Debug("received successful response for ei-job-id : ", jobIdStr)
254                         rmrData := fmt.Sprintf(jobCreationData, jobIdStr)
255                         a1.Logger.Debug("rmr_Data to send: ", rmrData)
256
257                         isSent := rmr.RmrSendToXapp(rmrData, a1EiCreateJobResp, DefaultSubId)
258                         if isSent {
259                                 a1.Logger.Debug("rmrSendToXapp : message sent")
260                         } else {
261                                 a1.Logger.Error("rmrSendToXapp : message not sent")
262                         }
263                 } else {
264                         a1.Logger.Warning("failed to create EIJOB ")
265                 }
266
267         default:
268                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
269         }
270
271         defer func() {
272                 rmr.rmrclient.Free(msg.Mbuf)
273                 msg.Mbuf = nil
274         }()
275         return
276 }
277
278 func (rmr *RmrSender) RmrRecieveStart() {
279         a1.Logger.Debug("Inside RmrRecieveStart function ")
280         go rmr.rmrclient.Start(rmr)
281         a1.Logger.Debug("Reciever started")
282 }