/* ================================================================================== Copyright (c) 2020 AT&T Intellectual Property. Copyright (c) 2020 Nokia Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ================================================================================== */ package main import ( "encoding/json" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "os" "strconv" "time" ) var delay int // used for the delay receiver var handlerID string // used for the delay receiver too var doQuery bool // used for the query receiver type a1Receiver struct { msgChan chan *xapp.RMRParams appReady bool rmrReady bool } type policyRequest struct { Operation string `json:"operation"` PolicyTypeID int `json:"policy_type_id"` PolicyInstanceID string `json:"policy_instance_id"` Pay interface{} `json:"payload"` } type policyRequestResponse struct { PolicyTypeID int `json:"policy_type_id"` PolicyInstanceID string `json:"policy_instance_id"` HandlerID string `json:"handler_id"` Status string `json:"status"` } type policyQuery struct { PolicyTypeID int `json:"policy_type_id"` } func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) { // helper for rmr that handles retries and sleep retries := 0 for { // just keep trying until it works if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr if ok := xapp.Rmr.SendMsg(params); ok { xapp.Logger.Info("Msg successfully sent after %d retries!", retries) return } retries++ //xapp.Logger.Info("Query failed to send...") } else { xapp.Logger.Info("rmr not ready...") time.Sleep(time.Duration(1) * time.Second) } } } func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) { // unmarshal the request var dat policyRequest if err := json.Unmarshal(msg.Payload, &dat); err != nil { panic(err) } var status string switch dat.Operation { case "CREATE": status = "OK" case "DELETE": status = "DELETED" } // form the response res := &policyRequestResponse{ dat.PolicyTypeID, dat.PolicyInstanceID, "test_receiver", status, } outgoing, err := json.Marshal(res) if err != nil { panic(err) } /* WARNING: we want to use rts here. However, the current go xapp framework rts is broken. */ params := &xapp.RMRParams{ Mtype: 20011, Payload: outgoing, } if delay > 0 { xapp.Logger.Info("Xapp is sleeping...") time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)... } e.sendMsgRetry(params) xapp.Logger.Info("Policy response sent!") } func (e *a1Receiver) sendQuery() { // form the query res := &policyQuery{ 1006001, } outgoing, err := json.Marshal(res) if err != nil { panic(err) } params := &xapp.RMRParams{ Mtype: 20012, Payload: outgoing, } for { /* We do this in a loop here, because even when the query first works, it could be the case that a1 does not even have the type yet, or there are no instances yet. In this integration test, we just keep pounding away so that eventually a1 returns the list this int test is looking for. A real xapp would NOT call the query in a loop like this. */ e.sendMsgRetry(params) xapp.Logger.Info("Query sent successfully") time.Sleep(time.Duration(1) * time.Second) } } func (e *a1Receiver) messageLoop() { for { xapp.Logger.Info("Waiting for message..") msg := <-e.msgChan xapp.Logger.Info("Message received!") defer xapp.Rmr.Free(msg.Mbuf) switch msg.Mtype { case 20010: e.handlePolicyReq(msg) default: panic("Unexpected message type!") } } } // Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages. func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) { e.msgChan <- rp return } func (e *a1Receiver) Run() { // Set MDC (read: name visible in the logs) xapp.Logger.SetMdc(handlerID, "0.1.0") /* from reading the xapp frame code... this SetReadyCB sets off a chain of events.. it sets readycb and readycbparams at the module level in xapp.go nothing happens yet.. when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp Rmr.SetReadyCB(xappReadyCb, nil) This "primes" the rmr client with it's own readycb, which is now set to this callback function When the rmr client is ready, it invokes the callback so basically, when rmr is ready, this function is invoked I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init! */ xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true) // start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs. go e.messageLoop() if doQuery { // we are in the query tester; kick off a loop that does that until it works go e.sendQuery() } xapp.Run(e) } func newA1Receiver(appReady, rmrReady bool) *a1Receiver { return &a1Receiver{ msgChan: make(chan *xapp.RMRParams), rmrReady: rmrReady, appReady: appReady, } } func main() { delay = 0 if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok { delay, _ = strconv.Atoi(d) } handlerID = "test_receiver" if hid, ok := os.LookupEnv("HANDLER_ID"); ok { handlerID = hid } doQuery = false if _, ok := os.LookupEnv("DO_QUERY"); ok { doQuery = true } newA1Receiver(true, false).Run() }