2bb2f1b2794c79612d9e07c7cbf245777cd62076
[ric-plt/a1.git] / a1-go / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "encoding/json"
26         "io/ioutil"
27         "net/http"
28         "strconv"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
31         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
32         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
33         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 )
35
36 const (
37         a1SourceName     = "service-ricplt-a1mediator-http"
38         a1PolicyRequest  = 20010
39         ecsServiceHost   = "http://ecs-service:8083"
40         ecsEiTypePath    = ecsServiceHost + "/A1-EI/v1/eitypes"
41         a1EiQueryAllResp = 20014
42 )
43
44 type RmrSender struct {
45         rmrclient     *xapp.RMRClient
46         policyManager *policy.PolicyManager
47 }
48
49 type IRmrSender interface {
50         RmrSendToXapp(httpBodyString string, messagetype int) bool
51 }
52
53 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
54         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
55                 StatDesc: "",
56                 RmrData: xapp.PortData{
57                         //TODO: Read configuration from config file
58                         Name:              "",
59                         MaxSize:           65534,
60                         ThreadType:        0,
61                         LowLatency:        false,
62                         FastAck:           false,
63                         MaxRetryOnFailure: 1,
64                         Port:              4561,
65                 },
66         })
67
68         rmrsender := &RmrSender{
69                 rmrclient:     RMRclient,
70                 policyManager: policyManager,
71         }
72
73         rmrsender.RmrRecieveStart()
74         return rmrsender
75 }
76
77 var RICMessageTypes = map[string]int{
78         "A1_POLICY_REQ":         20010,
79         "A1_POLICY_RESP":        20011,
80         "A1_POLICY_QUERY":       20012,
81         "A1_EI_QUERY_ALL":       20013,
82         "AI_EI_QUERY_ALL_RESP":  20014,
83         "A1_EI_CREATE_JOB":      20015,
84         "A1_EI_CREATE_JOB_RESP": 20016,
85         "A1_EI_DATA_DELIVERY":   20017,
86 }
87
88 func (rmr *RmrSender) GetRicMessageName(id int) (s string) {
89         for k, v := range RICMessageTypes {
90                 if id == v {
91                         return k
92                 }
93         }
94         return
95 }
96
97 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
98
99         params := &xapp.RMRParams{}
100         params.Mtype = messagetype
101         params.SubId = -1
102         params.Xid = ""
103         params.Meid = &xapp.RMRMeid{}
104         params.Src = a1SourceName
105         params.PayloadLen = len([]byte(httpBodyString))
106         params.Payload = []byte(httpBodyString)
107         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
108         a1.Logger.Debug("len payload %+v", len(params.Payload))
109         s := rmr.rmrclient.SendMsg(params)
110         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
111         return s
112 }
113
114 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
115         a1.Logger.Debug("In the Consume function")
116         id := rmr.GetRicMessageName(msg.Mtype)
117         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
118
119         switch id {
120
121         case "A1_POLICY_RESP":
122                 a1.Logger.Debug("Recived policy responose")
123                 payload := msg.Payload
124                 a1.Logger.Debug("message recieved : %s", payload)
125                 var result map[string]interface{}
126                 err := json.Unmarshal([]byte(payload), &result)
127                 if err != nil {
128                         a1.Logger.Error("Unmarshal error : %+v", err)
129                         return err
130                 }
131                 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
132                 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
133         case "A1_POLICY_QUERY":
134                 a1.Logger.Debug("Recived policy query")
135                 a1.Logger.Debug("message recieved ", msg.Payload)
136                 payload := msg.Payload
137                 var result map[string]interface{}
138                 json.Unmarshal([]byte(payload), &result)
139                 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
140                 policytypeid := (result["policy_type_id"].(float64))
141                 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
142                 if err1 != nil {
143                         a1.Logger.Error("Error : %+v", err1)
144                         return err1
145                 }
146                 a1.Logger.Debug("instanceList ", instanceList)
147                 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
148                 for _, policyinstanceid := range instanceList {
149                         policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
150                         if err2 != nil {
151                                 a1.Logger.Error("Error : %+v", err2)
152                                 return err2
153                         }
154                         a1.Logger.Debug("policyinstance ", policyinstance.(string))
155                         message := Message{}
156                         rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
157                         if err1 != nil {
158                                 a1.Logger.Error("error : %v", err1)
159                                 return err1
160                         }
161                         a1.Logger.Debug("rmrMessage ", rmrMessage)
162                         isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
163                         if isSent {
164                                 a1.Logger.Debug("rmrSendToXapp : message sent")
165                         } else {
166                                 a1.Logger.Error("rmrSendToXapp : message not sent")
167                         }
168                 }
169
170         case "A1_EI_QUERY_ALL":
171                 a1.Logger.Debug("message recieved ", msg.Payload)
172                 resp, err := http.Get(ecsEiTypePath)
173                 if err != nil {
174                         a1.Logger.Error("Received error while fetching health info: %v", err)
175                 }
176                 if resp.StatusCode != http.StatusOK {
177                         a1.Logger.Warning("Received no reponse from A1-EI service1")
178                 }
179                 a1.Logger.Debug("response from A1-EI service : ", resp)
180
181                 defer resp.Body.Close()
182                 respByte, err := ioutil.ReadAll(resp.Body)
183
184                 if err != nil {
185                         a1.Logger.Debug("error in response: %+v", respByte)
186                 }
187
188                 a1.Logger.Debug("response : %+v", string(respByte))
189
190                 isSent := rmr.RmrSendToXapp(string(respByte), a1EiQueryAllResp)
191                 if isSent {
192                         a1.Logger.Debug("rmrSendToXapp : message sent")
193                 } else {
194                         a1.Logger.Error("rmrSendToXapp : message not sent")
195                 }
196
197         default:
198                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
199         }
200
201         defer func() {
202                 rmr.rmrclient.Free(msg.Mbuf)
203                 msg.Mbuf = nil
204         }()
205         return
206 }
207
208 func (rmr *RmrSender) RmrRecieveStart() {
209         a1.Logger.Debug("Inside RmrRecieveStart function ")
210         rmr.rmrclient.Start(rmr)
211         a1.Logger.Debug("Reciever started")
212 }