RMR handler for A1 policy query
[ric-plt/a1.git] / a1-go / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "encoding/json"
26         "strconv"
27
28         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
29         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
30         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
31         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 )
33
34 const (
35         a1SourceName = "service-ricplt-a1mediator-http"
36 )
37
38 type RmrSender struct {
39         rmrclient     *xapp.RMRClient
40         policyManager *policy.PolicyManager
41 }
42
43 type IRmrSender interface {
44         RmrSendToXapp(httpBodyString string, messagetype int) bool
45 }
46
47 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
48         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
49                 StatDesc: "",
50                 RmrData: xapp.PortData{
51                         //TODO: Read configuration from config file
52                         Name:              "",
53                         MaxSize:           65534,
54                         ThreadType:        0,
55                         LowLatency:        false,
56                         FastAck:           false,
57                         MaxRetryOnFailure: 1,
58                         Port:              4561,
59                 },
60         })
61
62         rmrsender := &RmrSender{
63                 rmrclient:     RMRclient,
64                 policyManager: policyManager,
65         }
66
67         rmrsender.RmrRecieveStart()
68         return rmrsender
69 }
70
71 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
72
73         params := &xapp.RMRParams{}
74         params.Mtype = messagetype
75         params.SubId = -1
76         params.Xid = ""
77         params.Meid = &xapp.RMRMeid{}
78         params.Src = a1SourceName
79         params.PayloadLen = len([]byte(httpBodyString))
80         params.Payload = []byte(httpBodyString)
81         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
82         a1.Logger.Debug("len payload %+v", len(params.Payload))
83         s := rmr.rmrclient.SendMsg(params)
84         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
85         return s
86 }
87
88 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
89         a1.Logger.Debug("In the Consume function")
90         id := xapp.Rmr.GetRicMessageName(msg.Mtype)
91         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
92
93         switch id {
94
95         case "A1_POLICY_RESP":
96                 a1.Logger.Debug("Recived policy responose")
97                 payload := msg.Payload
98                 a1.Logger.Debug("message recieved : %s", payload)
99                 var result map[string]interface{}
100                 err := json.Unmarshal([]byte(payload), &result)
101                 if err != nil {
102                         a1.Logger.Error("Unmarshal error : %+v", err)
103                         return err
104                 }
105                 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
106                 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
107         case "A1_POLICY_QUERY":
108                 a1.Logger.Debug("Recived policy query")
109                 a1.Logger.Debug("message recieved ", msg.Payload)
110                 payload := msg.Payload
111                 var result map[string]interface{}
112                 json.Unmarshal([]byte(payload), &result)
113                 a1.Logger.Debug("message recieved : %s for %d and %d", result, result["policy_type_id"], result["policy_instance_id"])
114                 policytypeid := (result["policy_type_id"].(float64))
115                 instanceList, err1 := rmr.policyManager.GetAllPolicyInstance(int(policytypeid))
116                 if err1 != nil {
117                         a1.Logger.Error("Error : %+v", err1)
118                         return err1
119                 }
120                 a1.Logger.Debug("instanceList ", instanceList)
121                 a1.Logger.Debug("Received a query for a known policy type: %d", policytypeid)
122                 for _, policyinstanceid := range instanceList {
123                         policyinstance, err2 := rmr.policyManager.GetPolicyInstance(models.PolicyTypeID(policytypeid), policyinstanceid)
124                         if err2 != nil {
125                                 a1.Logger.Error("Error : %+v", err2)
126                                 return err2
127                         }
128                         a1.Logger.Debug("policyinstance ", policyinstance.(string))
129                         message := Message{}
130                         rmrMessage, err1 := message.PolicyMessage(strconv.FormatInt((int64(policytypeid)), 10), string(policyinstanceid), policyinstance.(string), "CREATE")
131                         if err1 != nil {
132                                 a1.Logger.Error("error : %v", err1)
133                                 return err1
134                         }
135                         a1.Logger.Debug("rmrMessage ", rmrMessage)
136                         isSent := rmr.RmrSendToXapp(rmrMessage, a1PolicyRequest)
137                         if isSent {
138                                 a1.Logger.Debug("rmrSendToXapp : message sent")
139                         } else {
140                                 a1.Logger.Error("rmrSendToXapp : message not sent")
141                         }
142                 }
143         default:
144                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
145         }
146
147         defer func() {
148                 rmr.rmrclient.Free(msg.Mbuf)
149                 msg.Mbuf = nil
150         }()
151         return
152 }
153
154 func (rmr *RmrSender) RmrRecieveStart() {
155         a1.Logger.Debug("Inside RmrRecieveStart function ")
156         rmr.rmrclient.Start(rmr)
157         a1.Logger.Debug("Reciever started")
158 }