Mostly integration test work:
[ric-plt/a1.git] / integration_tests / testxappcode / receiver.go
1 /*
2 ==================================================================================
3   Copyright (c) 2020 AT&T Intellectual Property.
4   Copyright (c) 2020 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19 package main
20
21 import (
22         "encoding/json"
23         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
24         "os"
25         "strconv"
26         "time"
27 )
28
29 var delay int        // used for the delay receiver
30 var handlerID string // used for the delay receiver too
31 var doQuery bool     // used for the query receiver
32
33 type a1Receiver struct {
34         msgChan  chan *xapp.RMRParams
35         appReady bool
36         rmrReady bool
37 }
38
39 type policyRequest struct {
40         Operation        string      `json:"operation"`
41         PolicyTypeID     int         `json:"policy_type_id"`
42         PolicyInstanceID string      `json:"policy_instance_id"`
43         Pay              interface{} `json:"payload"`
44 }
45
46 type policyRequestResponse struct {
47         PolicyTypeID     int    `json:"policy_type_id"`
48         PolicyInstanceID string `json:"policy_instance_id"`
49         HandlerID        string `json:"handler_id"`
50         Status           string `json:"status"`
51 }
52
53 type policyQuery struct {
54         PolicyTypeID int `json:"policy_type_id"`
55 }
56
57 func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) {
58         // helper for rmr that handles retries and sleep
59         retries := 0
60         for { // just keep trying until it works
61                 if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr
62                         if ok := xapp.Rmr.SendMsg(params); ok {
63                                 xapp.Logger.Info("Msg successfully sent after %d retries!", retries)
64                                 return
65                         }
66                         retries++
67                         //xapp.Logger.Info("Query failed to send...")
68                 } else {
69                         xapp.Logger.Info("rmr not ready...")
70                         time.Sleep(time.Duration(1) * time.Second)
71                 }
72         }
73 }
74
75 func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) {
76
77         // unmarshal the request
78         var dat policyRequest
79         if err := json.Unmarshal(msg.Payload, &dat); err != nil {
80                 panic(err)
81         }
82
83         var status string
84         switch dat.Operation {
85         case "CREATE":
86                 status = "OK"
87         case "DELETE":
88                 status = "DELETED"
89         }
90
91         // form the response
92         res := &policyRequestResponse{
93                 dat.PolicyTypeID,
94                 dat.PolicyInstanceID,
95                 "test_receiver",
96                 status,
97         }
98
99         outgoing, err := json.Marshal(res)
100         if err != nil {
101                 panic(err)
102         }
103
104         /*
105                 WARNING:
106                 we want to use rts here. However, the current go xapp framework rts is broken.
107         */
108         params := &xapp.RMRParams{
109                 Mtype:   20011,
110                 Payload: outgoing,
111         }
112
113         if delay > 0 {
114                 xapp.Logger.Info("Xapp is sleeping...")
115                 time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)...
116         }
117
118         e.sendMsgRetry(params)
119
120         xapp.Logger.Info("Policy response sent!")
121 }
122
123 func (e *a1Receiver) sendQuery() {
124         // form the query
125         res := &policyQuery{
126                 1006001,
127         }
128         outgoing, err := json.Marshal(res)
129         if err != nil {
130                 panic(err)
131         }
132         params := &xapp.RMRParams{
133                 Mtype:   20012,
134                 Payload: outgoing,
135         }
136
137         for {
138                 /* We do this in a loop here, because even when the query first works, it could be the case that
139                    a1 does not even have the type yet, or there are no instances yet. In this integration test,
140                    we just keep pounding away so that eventually a1 returns the list this int test is looking for.
141                    A real xapp would NOT call the query in a loop like this.
142                 */
143                 e.sendMsgRetry(params)
144                 xapp.Logger.Info("Query sent successfully")
145                 time.Sleep(time.Duration(1) * time.Second)
146         }
147 }
148
149 func (e *a1Receiver) messageLoop() {
150         for {
151                 xapp.Logger.Info("Waiting for message..")
152
153                 msg := <-e.msgChan
154
155                 xapp.Logger.Info("Message received!")
156                 defer xapp.Rmr.Free(msg.Mbuf)
157
158                 switch msg.Mtype {
159                 case 20010:
160                         e.handlePolicyReq(msg)
161                 default:
162                         panic("Unexpected message type!")
163                 }
164         }
165 }
166
167 // Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages.
168 func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) {
169         e.msgChan <- rp
170         return
171 }
172
173 func (e *a1Receiver) Run() {
174         // Set MDC (read: name visible in the logs)
175         xapp.Logger.SetMdc(handlerID, "0.1.0")
176
177         /* from reading the xapp frame code...
178                    this SetReadyCB sets off a chain of events..
179                it sets readycb and readycbparams at the module level in xapp.go
180                    nothing happens yet..
181                    when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp
182                        Rmr.SetReadyCB(xappReadyCb, nil)
183                    This "primes" the rmr client with it's own readycb, which is now set to this callback function
184                    When the rmr client is ready, it invokes the callback
185                    so basically, when rmr is ready, this function is invoked
186                    I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init!
187         */
188         xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true)
189
190         // start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs.
191         go e.messageLoop()
192
193         if doQuery {
194                 // we are in the query tester; kick off a loop that does that until it works
195                 go e.sendQuery()
196         }
197
198         xapp.Run(e)
199 }
200
201 func newA1Receiver(appReady, rmrReady bool) *a1Receiver {
202         return &a1Receiver{
203                 msgChan:  make(chan *xapp.RMRParams),
204                 rmrReady: rmrReady,
205                 appReady: appReady,
206         }
207 }
208
209 func main() {
210
211         delay = 0
212         if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok {
213                 delay, _ = strconv.Atoi(d)
214         }
215
216         handlerID = "test_receiver"
217         if hid, ok := os.LookupEnv("HANDLER_ID"); ok {
218                 handlerID = hid
219         }
220
221         doQuery = false
222         if _, ok := os.LookupEnv("DO_QUERY"); ok {
223                 doQuery = true
224         }
225
226         newA1Receiver(true, false).Run()
227 }