Mostly integration test work:
[ric-plt/a1.git] / integration_tests / testxappcode / receiver.go
diff --git a/integration_tests/testxappcode/receiver.go b/integration_tests/testxappcode/receiver.go
new file mode 100755 (executable)
index 0000000..9012094
--- /dev/null
@@ -0,0 +1,227 @@
+/*
+==================================================================================
+  Copyright (c) 2020 AT&T Intellectual Property.
+  Copyright (c) 2020 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+package main
+
+import (
+       "encoding/json"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "os"
+       "strconv"
+       "time"
+)
+
+var delay int        // used for the delay receiver
+var handlerID string // used for the delay receiver too
+var doQuery bool     // used for the query receiver
+
+type a1Receiver struct {
+       msgChan  chan *xapp.RMRParams
+       appReady bool
+       rmrReady bool
+}
+
+type policyRequest struct {
+       Operation        string      `json:"operation"`
+       PolicyTypeID     int         `json:"policy_type_id"`
+       PolicyInstanceID string      `json:"policy_instance_id"`
+       Pay              interface{} `json:"payload"`
+}
+
+type policyRequestResponse struct {
+       PolicyTypeID     int    `json:"policy_type_id"`
+       PolicyInstanceID string `json:"policy_instance_id"`
+       HandlerID        string `json:"handler_id"`
+       Status           string `json:"status"`
+}
+
+type policyQuery struct {
+       PolicyTypeID int `json:"policy_type_id"`
+}
+
+func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) {
+       // helper for rmr that handles retries and sleep
+       retries := 0
+       for { // just keep trying until it works
+               if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr
+                       if ok := xapp.Rmr.SendMsg(params); ok {
+                               xapp.Logger.Info("Msg successfully sent after %d retries!", retries)
+                               return
+                       }
+                       retries++
+                       //xapp.Logger.Info("Query failed to send...")
+               } else {
+                       xapp.Logger.Info("rmr not ready...")
+                       time.Sleep(time.Duration(1) * time.Second)
+               }
+       }
+}
+
+func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) {
+
+       // unmarshal the request
+       var dat policyRequest
+       if err := json.Unmarshal(msg.Payload, &dat); err != nil {
+               panic(err)
+       }
+
+       var status string
+       switch dat.Operation {
+       case "CREATE":
+               status = "OK"
+       case "DELETE":
+               status = "DELETED"
+       }
+
+       // form the response
+       res := &policyRequestResponse{
+               dat.PolicyTypeID,
+               dat.PolicyInstanceID,
+               "test_receiver",
+               status,
+       }
+
+       outgoing, err := json.Marshal(res)
+       if err != nil {
+               panic(err)
+       }
+
+       /*
+               WARNING:
+               we want to use rts here. However, the current go xapp framework rts is broken.
+       */
+       params := &xapp.RMRParams{
+               Mtype:   20011,
+               Payload: outgoing,
+       }
+
+       if delay > 0 {
+               xapp.Logger.Info("Xapp is sleeping...")
+               time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)...
+       }
+
+       e.sendMsgRetry(params)
+
+       xapp.Logger.Info("Policy response sent!")
+}
+
+func (e *a1Receiver) sendQuery() {
+       // form the query
+       res := &policyQuery{
+               1006001,
+       }
+       outgoing, err := json.Marshal(res)
+       if err != nil {
+               panic(err)
+       }
+       params := &xapp.RMRParams{
+               Mtype:   20012,
+               Payload: outgoing,
+       }
+
+       for {
+               /* We do this in a loop here, because even when the query first works, it could be the case that
+                  a1 does not even have the type yet, or there are no instances yet. In this integration test,
+                  we just keep pounding away so that eventually a1 returns the list this int test is looking for.
+                  A real xapp would NOT call the query in a loop like this.
+               */
+               e.sendMsgRetry(params)
+               xapp.Logger.Info("Query sent successfully")
+               time.Sleep(time.Duration(1) * time.Second)
+       }
+}
+
+func (e *a1Receiver) messageLoop() {
+       for {
+               xapp.Logger.Info("Waiting for message..")
+
+               msg := <-e.msgChan
+
+               xapp.Logger.Info("Message received!")
+               defer xapp.Rmr.Free(msg.Mbuf)
+
+               switch msg.Mtype {
+               case 20010:
+                       e.handlePolicyReq(msg)
+               default:
+                       panic("Unexpected message type!")
+               }
+       }
+}
+
+// Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages.
+func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) {
+       e.msgChan <- rp
+       return
+}
+
+func (e *a1Receiver) Run() {
+       // Set MDC (read: name visible in the logs)
+       xapp.Logger.SetMdc(handlerID, "0.1.0")
+
+       /* from reading the xapp frame code...
+                  this SetReadyCB sets off a chain of events..
+              it sets readycb and readycbparams at the module level in xapp.go
+                  nothing happens yet..
+                  when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp
+                      Rmr.SetReadyCB(xappReadyCb, nil)
+                  This "primes" the rmr client with it's own readycb, which is now set to this callback function
+                  When the rmr client is ready, it invokes the callback
+                  so basically, when rmr is ready, this function is invoked
+                  I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init!
+       */
+       xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true)
+
+       // start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs.
+       go e.messageLoop()
+
+       if doQuery {
+               // we are in the query tester; kick off a loop that does that until it works
+               go e.sendQuery()
+       }
+
+       xapp.Run(e)
+}
+
+func newA1Receiver(appReady, rmrReady bool) *a1Receiver {
+       return &a1Receiver{
+               msgChan:  make(chan *xapp.RMRParams),
+               rmrReady: rmrReady,
+               appReady: appReady,
+       }
+}
+
+func main() {
+
+       delay = 0
+       if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok {
+               delay, _ = strconv.Atoi(d)
+       }
+
+       handlerID = "test_receiver"
+       if hid, ok := os.LookupEnv("HANDLER_ID"); ok {
+               handlerID = hid
+       }
+
+       doQuery = false
+       if _, ok := os.LookupEnv("DO_QUERY"); ok {
+               doQuery = true
+       }
+
+       newA1Receiver(true, false).Run()
+}