2 ==================================================================================
3 Copyright (c) 2020 AT&T Intellectual Property.
4 Copyright (c) 2020 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
23 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 var delay int // used for the delay receiver
30 var handlerID string // used for the delay receiver too
31 var doQuery bool // used for the query receiver
33 type a1Receiver struct {
34 msgChan chan *xapp.RMRParams
39 type policyRequest struct {
40 Operation string `json:"operation"`
41 PolicyTypeID int `json:"policy_type_id"`
42 PolicyInstanceID string `json:"policy_instance_id"`
43 Pay interface{} `json:"payload"`
46 type policyRequestResponse struct {
47 PolicyTypeID int `json:"policy_type_id"`
48 PolicyInstanceID string `json:"policy_instance_id"`
49 HandlerID string `json:"handler_id"`
50 Status string `json:"status"`
53 type policyQuery struct {
54 PolicyTypeID int `json:"policy_type_id"`
57 func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) {
58 // helper for rmr that handles retries and sleep
60 for { // just keep trying until it works
61 if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr
62 if ok := xapp.Rmr.SendMsg(params); ok {
63 xapp.Logger.Info("Msg successfully sent after %d retries!", retries)
67 //xapp.Logger.Info("Query failed to send...")
69 xapp.Logger.Info("rmr not ready...")
70 time.Sleep(time.Duration(1) * time.Second)
75 func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) {
77 // unmarshal the request
79 if err := json.Unmarshal(msg.Payload, &dat); err != nil {
84 switch dat.Operation {
92 res := &policyRequestResponse{
99 outgoing, err := json.Marshal(res)
106 we want to use rts here. However, the current go xapp framework rts is broken.
108 params := &xapp.RMRParams{
114 xapp.Logger.Info("Xapp is sleeping...")
115 time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)...
118 e.sendMsgRetry(params)
120 xapp.Logger.Info("Policy response sent!")
123 func (e *a1Receiver) sendQuery() {
128 outgoing, err := json.Marshal(res)
132 params := &xapp.RMRParams{
138 /* We do this in a loop here, because even when the query first works, it could be the case that
139 a1 does not even have the type yet, or there are no instances yet. In this integration test,
140 we just keep pounding away so that eventually a1 returns the list this int test is looking for.
141 A real xapp would NOT call the query in a loop like this.
143 e.sendMsgRetry(params)
144 xapp.Logger.Info("Query sent successfully")
145 time.Sleep(time.Duration(1) * time.Second)
149 func (e *a1Receiver) messageLoop() {
151 xapp.Logger.Info("Waiting for message..")
155 xapp.Logger.Info("Message received!")
156 defer xapp.Rmr.Free(msg.Mbuf)
160 e.handlePolicyReq(msg)
162 panic("Unexpected message type!")
167 // Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages.
168 func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) {
173 func (e *a1Receiver) Run() {
174 // Set MDC (read: name visible in the logs)
175 xapp.Logger.SetMdc(handlerID, "0.1.0")
177 /* from reading the xapp frame code...
178 this SetReadyCB sets off a chain of events..
179 it sets readycb and readycbparams at the module level in xapp.go
180 nothing happens yet..
181 when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp
182 Rmr.SetReadyCB(xappReadyCb, nil)
183 This "primes" the rmr client with it's own readycb, which is now set to this callback function
184 When the rmr client is ready, it invokes the callback
185 so basically, when rmr is ready, this function is invoked
186 I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init!
188 xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true)
190 // start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs.
194 // we are in the query tester; kick off a loop that does that until it works
201 func newA1Receiver(appReady, rmrReady bool) *a1Receiver {
203 msgChan: make(chan *xapp.RMRParams),
212 if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok {
213 delay, _ = strconv.Atoi(d)
216 handlerID = "test_receiver"
217 if hid, ok := os.LookupEnv("HANDLER_ID"); ok {
222 if _, ok := os.LookupEnv("DO_QUERY"); ok {
226 newA1Receiver(true, false).Run()