2e158f834ef199d26f5e7dc530fce564a0d9139c
[ric-plt/a1.git] / a1-go / pkg / rmr / rmr.go
1 /*
2 ==================================================================================
3   Copyright (c) 2022 Samsung
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16
17    This source code is part of the near-RT RIC (RAN Intelligent Controller)
18    platform project (RICP).
19 ==================================================================================
20 */
21
22 package rmr
23
24 import (
25         "encoding/json"
26
27         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
28         "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/policy"
29         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 )
31
32 const (
33         a1SourceName = "service-ricplt-a1mediator-http"
34 )
35
36 type RmrSender struct {
37         rmrclient     *xapp.RMRClient
38         policyManager *policy.PolicyManager
39 }
40
41 type IRmrSender interface {
42         RmrSendToXapp(httpBodyString string, messagetype int) bool
43 }
44
45 func NewRMRSender(policyManager *policy.PolicyManager) IRmrSender {
46         RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
47                 StatDesc: "",
48                 RmrData: xapp.PortData{
49                         //TODO: Read configuration from config file
50                         Name:              "",
51                         MaxSize:           65534,
52                         ThreadType:        0,
53                         LowLatency:        false,
54                         FastAck:           false,
55                         MaxRetryOnFailure: 1,
56                         Port:              4561,
57                 },
58         })
59
60         rmrsender := &RmrSender{
61                 rmrclient:     RMRclient,
62                 policyManager: policyManager,
63         }
64
65         rmrsender.RmrRecieveStart()
66         return rmrsender
67 }
68
69 func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
70
71         params := &xapp.RMRParams{}
72         params.Mtype = messagetype
73         params.SubId = -1
74         params.Xid = ""
75         params.Meid = &xapp.RMRMeid{}
76         params.Src = a1SourceName
77         params.PayloadLen = len([]byte(httpBodyString))
78         params.Payload = []byte(httpBodyString)
79         a1.Logger.Debug("MSG to XAPP: %s ", params.String())
80         a1.Logger.Debug("len payload %+v", len(params.Payload))
81         s := rmr.rmrclient.SendMsg(params)
82         a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
83         return s
84 }
85
86 func (rmr *RmrSender) Consume(msg *xapp.RMRParams) (err error) {
87         a1.Logger.Debug("In the Consume function")
88         id := xapp.Rmr.GetRicMessageName(msg.Mtype)
89         a1.Logger.Debug("Message received: name=%s meid=%s subId=%d txid=%s len=%d", id, msg.Meid.RanName, msg.SubId, msg.Xid, msg.PayloadLen)
90
91         switch id {
92
93         case "A1_POLICY_RESP":
94                 a1.Logger.Debug("Recived policy responose")
95                 payload := msg.Payload
96                 a1.Logger.Debug("message recieved : %s", payload)
97                 var result map[string]interface{}
98                 err := json.Unmarshal([]byte(payload), &result)
99                 if err != nil {
100                         a1.Logger.Error("Unmarshal error : %+v", err)
101                         return err
102                 }
103                 a1.Logger.Debug("message recieved for %d and %d with status : %s", result["policy_type_id"], result["policy_instance_id"], result["status"])
104                 rmr.policyManager.SetPolicyInstanceStatus(int(result["policy_type_id"].(float64)), int(result["policy_instance_id"].(float64)), result["status"].(string))
105         default:
106                 xapp.Logger.Error("Unknown message type '%d', discarding", msg.Mtype)
107         }
108
109         defer func() {
110                 rmr.rmrclient.Free(msg.Mbuf)
111                 msg.Mbuf = nil
112         }()
113         return
114 }
115
116 func (rmr *RmrSender) RmrRecieveStart() {
117         a1.Logger.Debug("Inside RmrRecieveStart function ")
118         rmr.rmrclient.Start(rmr)
119         a1.Logger.Debug("Reciever started")
120 }