RICPLT-3014 Subscription multiple endpoints
[ric-plt/submgr.git] / pkg / control / main_test.go
index 1c69b01..ab0bff6 100644 (file)
@@ -21,7 +21,6 @@ package control
 
 import (
        "encoding/json"
-       "errors"
        "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
@@ -30,6 +29,7 @@ import (
        "os"
        "strconv"
        "strings"
+       "sync"
        "testing"
        "time"
 )
@@ -37,23 +37,127 @@ import (
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type testingControl struct {
+
+type httpEventWaiter struct {
+       resultChan   chan bool
+       nextActionOk bool
+}
+
+func (msg *httpEventWaiter) SetResult(res bool) {
+       msg.resultChan <- res
+}
+
+func (msg *httpEventWaiter) WaitResult(t *testing.T) bool {
+       select {
+       case result := <-msg.resultChan:
+               return result
+       case <-time.After(15 * time.Second):
+               testError(t, "Waiter not received result status from case within 15 secs")
+               return false
+       }
+       testError(t, "Waiter error in default branch")
+       return false
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingHttpRtmgrControl struct {
+       sync.Mutex
+       desc        string
+       port        string
+       eventWaiter *httpEventWaiter
+}
+
+func (hc *testingHttpRtmgrControl) NextEvent(eventWaiter *httpEventWaiter) {
+       hc.Lock()
+       defer hc.Unlock()
+       hc.eventWaiter = eventWaiter
+}
+
+func (hc *testingHttpRtmgrControl) AllocNextEvent(nextAction bool) *httpEventWaiter {
+       eventWaiter := &httpEventWaiter{
+               resultChan:   make(chan bool),
+               nextActionOk: nextAction,
+       }
+       hc.NextEvent(eventWaiter)
+       return eventWaiter
+}
+
+func (hc *testingHttpRtmgrControl) http_handler(w http.ResponseWriter, r *http.Request) {
+
+       hc.Lock()
+       defer hc.Unlock()
+
+       var req rtmgr_models.XappSubscriptionData
+       err := json.NewDecoder(r.Body).Decode(&req)
+       if err != nil {
+               xapp.Logger.Error("%s", err.Error())
+       }
+       xapp.Logger.Info("(%s) handling Address=%s Port=%d SubscriptionID=%d", hc.desc, *req.Address, *req.Port, *req.SubscriptionID)
+
+       var code int = 0
+       switch r.Method {
+       case http.MethodPost:
+               code = 201
+               if hc.eventWaiter != nil {
+                       if hc.eventWaiter.nextActionOk == false {
+                               code = 400
+                       }
+               }
+       case http.MethodDelete:
+               code = 200
+               if hc.eventWaiter != nil {
+                       if hc.eventWaiter.nextActionOk == false {
+                               code = 400
+                       }
+               }
+       default:
+               code = 200
+       }
+
+       waiter := hc.eventWaiter
+       hc.eventWaiter = nil
+       if waiter != nil {
+               waiter.SetResult(true)
+       }
+       xapp.Logger.Info("(%s) Method=%s Reply with code %d", hc.desc, r.Method, code)
+       w.WriteHeader(code)
+
+}
+
+func (hc *testingHttpRtmgrControl) run() {
+       http.HandleFunc("/", hc.http_handler)
+       http.ListenAndServe("localhost:"+hc.port, nil)
+}
+
+func initTestingHttpRtmgrControl(desc string, port string) *testingHttpRtmgrControl {
+       hc := &testingHttpRtmgrControl{}
+       hc.desc = desc
+       hc.port = port
+       return hc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingRmrControl struct {
        desc     string
        syncChan chan struct{}
 }
 
-func (tc *testingControl) ReadyCB(data interface{}) {
-       xapp.Logger.Info("testingControl(%s) ReadyCB", tc.desc)
+func (tc *testingRmrControl) ReadyCB(data interface{}) {
+       xapp.Logger.Info("testingRmrControl(%s) ReadyCB", tc.desc)
        tc.syncChan <- struct{}{}
        return
 }
 
-func (tc *testingControl) WaitCB() {
+func (tc *testingRmrControl) WaitCB() {
        <-tc.syncChan
 }
 
-func initTestingControl(desc string, rtfile string, port string) testingControl {
-       tc := testingControl{}
+func initTestingControl(desc string, rtfile string, port string) testingRmrControl {
+       tc := testingRmrControl{}
        os.Setenv("RMR_SEED_RT", rtfile)
        os.Setenv("RMR_SRC_ID", "localhost:"+port)
        xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
@@ -66,34 +170,58 @@ func initTestingControl(desc string, rtfile string, port string) testingControl
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type testingRmrControl struct {
-       testingControl
+type testingRmrStubControl struct {
+       testingRmrControl
        rmrClientTest *xapp.RMRClient
+       active        bool
+       msgCnt        uint64
 }
 
-func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
+func (tc *testingRmrStubControl) GetMsgCnt() uint64 {
+       return tc.msgCnt
+}
+
+func (tc *testingRmrStubControl) IncMsgCnt() {
+       tc.msgCnt++
+}
+
+func (tc *testingRmrStubControl) DecMsgCnt() {
+       if tc.msgCnt > 0 {
+               tc.msgCnt--
+       }
+}
+
+func (tc *testingRmrStubControl) TestMsgCnt(t *testing.T) {
+       if tc.GetMsgCnt() > 0 {
+               testError(t, "(%s) message count expected 0 but is %d", tc.desc, tc.GetMsgCnt())
+       }
+}
+
+func (tc *testingRmrStubControl) RmrSend(params *RMRParams) (err error) {
        //
        //NOTE: Do this way until xapp-frame sending is improved
        //
+       xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String())
        status := false
        i := 1
        for ; i <= 10 && status == false; i++ {
-               status = tc.rmrClientTest.SendMsg(params)
+               status = tc.rmrClientTest.SendMsg(params.RMRParams)
                if status == false {
-                       xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
+                       xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
                        time.Sleep(500 * time.Millisecond)
                }
        }
        if status == false {
-               err = errors.New("rmr.Send() failed")
-               tc.rmrClientTest.Free(params.Mbuf)
+               err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
+               xapp.Rmr.Free(params.Mbuf)
        }
        return
 }
 
-func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
-       tc := testingRmrControl{}
-       tc.testingControl = initTestingControl(desc, rtfile, port)
+func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrStubControl {
+       tc := testingRmrStubControl{}
+       tc.active = false
+       tc.testingRmrControl = initTestingControl(desc, rtfile, port)
        tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
        tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
        go tc.rmrClientTest.Start(consumer)
@@ -105,52 +233,75 @@ func initTestingRmrControl(desc string, rtfile string, port string, stat string,
 //
 //-----------------------------------------------------------------------------
 type testingMessageChannel struct {
-       rmrConChan chan *xapp.RMRParams
+       rmrConChan chan *RMRParams
 }
 
 func initTestingMessageChannel() testingMessageChannel {
        mc := testingMessageChannel{}
-       mc.rmrConChan = make(chan *xapp.RMRParams)
+       mc.rmrConChan = make(chan *RMRParams)
        return mc
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
+type xappTransaction struct {
+       tc   *testingXappControl
+       xid  string
+       meid *xapp.RMRMeid
+}
 
 type testingXappControl struct {
-       testingRmrControl
+       testingRmrStubControl
        testingMessageChannel
-       meid    *xapp.RMRMeid
        xid_seq uint64
-       xid     string
 }
 
 func (tc *testingXappControl) newXid() string {
-       tc.xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
+       var xid string
+       xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
        tc.xid_seq++
-       return tc.xid
+       return xid
+}
+
+func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
+       trans := &xappTransaction{}
+       trans.tc = tc
+       if xid == nil {
+               trans.xid = tc.newXid()
+       } else {
+               trans.xid = *xid
+       }
+       trans.meid = &xapp.RMRMeid{RanName: ranname}
+       return trans
 }
 
-func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
+func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+       msg := &RMRParams{params}
+
+       if params.Mtype == 55555 {
+               xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
+               tc.active = true
+               return
+       }
 
-       //if msg.Xid == tc.xid {
        if strings.Contains(msg.Xid, tc.desc) {
-               xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+               xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
+               tc.IncMsgCnt()
                tc.rmrConChan <- msg
        } else {
-               xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc)
+               xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String())
        }
        return
 }
 
-func createNewXappControl(desc string, rtfile string, port string, stat string, ranname string) *testingXappControl {
+func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl {
        xappCtrl := &testingXappControl{}
-       xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
+       xappCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
        xappCtrl.testingMessageChannel = initTestingMessageChannel()
-       xappCtrl.meid = &xapp.RMRMeid{RanName: ranname}
-       xappCtrl.xid_seq = 0
-       xappCtrl.newXid()
+       xappCtrl.xid_seq = 1
        return xappCtrl
 }
 
@@ -158,19 +309,29 @@ func createNewXappControl(desc string, rtfile string, port string, stat string,
 //
 //-----------------------------------------------------------------------------
 type testingE2termControl struct {
-       testingRmrControl
+       testingRmrStubControl
        testingMessageChannel
 }
 
-func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) {
-       xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+       msg := &RMRParams{params}
+
+       if params.Mtype == 55555 {
+               xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
+               tc.active = true
+               return
+       }
+
+       xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
        tc.rmrConChan <- msg
        return
 }
 
 func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
        e2termCtrl := &testingE2termControl{}
-       e2termCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
+       e2termCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
        e2termCtrl.testingMessageChannel = initTestingMessageChannel()
        return e2termCtrl
 }
@@ -179,13 +340,13 @@ func createNewE2termControl(desc string, rtfile string, port string, stat string
 //
 //-----------------------------------------------------------------------------
 type testingMainControl struct {
-       testingControl
+       testingRmrControl
        c *Control
 }
 
 func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
        mainCtrl = &testingMainControl{}
-       mainCtrl.testingControl = initTestingControl(desc, rtfile, port)
+       mainCtrl.testingRmrControl = initTestingControl(desc, rtfile, port)
        mainCtrl.c = NewControl()
        xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
        go xapp.RunWithParams(mainCtrl.c, false)
@@ -228,10 +389,21 @@ var xappConn1 *testingXappControl
 var xappConn2 *testingXappControl
 var e2termConn *testingE2termControl
 var mainCtrl *testingMainControl
+var rtmgrHttp *testingHttpRtmgrControl
 
 func TestMain(m *testing.M) {
        xapp.Logger.Info("TestMain start")
 
+       //---------------------------------
+       //
+       //---------------------------------
+       rtmgrHttp = initTestingHttpRtmgrControl("RTMGRSTUB", "8989")
+       go rtmgrHttp.run()
+
+       //---------------------------------
+       //
+       //---------------------------------
+
        //
        //Cfg creation won't work like this as xapp-frame reads it during init.
        //
@@ -269,28 +441,61 @@ func TestMain(m *testing.M) {
        xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
 
        //---------------------------------
+       // Static routetable for rmr
+       //
        // NOTE: Routing table is configured so, that responses
        //       are duplicated to xapp1 and xapp2 instances.
        //       If XID is not matching xapp stub will just
        //       drop message. (Messages 12011, 12012, 12021, 12022)
+       //
+       // 14560 submgr
+       // 15560 e2term stub
+       // 13560 xapp1 stub
+       // 13660 xapp2 stub
+       //
        //---------------------------------
-       xapp.Logger.Info("### submgr main run ###")
 
-       subsrt := `newrt|start
+       allrt := `newrt|start
 mse|12010|-1|localhost:14560
 mse|12010,localhost:14560|-1|localhost:15560
 mse|12011,localhost:15560|-1|localhost:14560
-mse|12011|-1|localhost:13560;localhost:13660
 mse|12012,localhost:15560|-1|localhost:14560
-mse|12012|-1|localhost:13560;localhost:13660
+mse|12011,localhost:14560|-1|localhost:13660;localhost:13560
+mse|12012,localhost:14560|-1|localhost:13660;localhost:13560
 mse|12020|-1|localhost:14560
 mse|12020,localhost:14560|-1|localhost:15560
 mse|12021,localhost:15560|-1|localhost:14560
-mse|12021|-1|localhost:13560;localhost:13660
 mse|12022,localhost:15560|-1|localhost:14560
-mse|12022|-1|localhost:13560;localhost:13660
+mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
+mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
+mse|55555|-1|localhost:13660;localhost:13560,localhost:15560
 newrt|end
 `
+
+       //---------------------------------
+       //
+       //---------------------------------
+       xapp.Logger.Info("### submgr main run ###")
+
+       subsrt := allrt
+       /*
+               subsrt := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12010,localhost:14560|-1|localhost:15560
+          mse|12011,localhost:15560|-1|localhost:14560
+          mse|12011|-1|localhost:13560;localhost:13660
+          mse|12012,localhost:15560|-1|localhost:14560
+          mse|12012|-1|localhost:13560;localhost:13660
+          mse|12020|-1|localhost:14560
+          mse|12020,localhost:14560|-1|localhost:15560
+          mse|12021,localhost:15560|-1|localhost:14560
+          mse|12021|-1|localhost:13560;localhost:13660
+          mse|12022,localhost:15560|-1|localhost:14560
+          mse|12022|-1|localhost:13560;localhost:13660
+          newrt|end
+          `
+       */
+
        subrtfilename, _ := testCreateTmpFile(subsrt)
        defer os.Remove(subrtfilename)
        mainCtrl = createNewMainControl("main", subrtfilename, "14560")
@@ -300,19 +505,22 @@ newrt|end
        //---------------------------------
        xapp.Logger.Info("### xapp1 rmr run ###")
 
-       xapprt1 := `newrt|start
-mse|12010|-1|localhost:14560
-mse|12011|-1|localhost:13560
-mse|12012|-1|localhost:13560
-mse|12020|-1|localhost:14560
-mse|12021|-1|localhost:13560
-mse|12022|-1|localhost:13560
-newrt|end
-`
+       xapprt1 := allrt
+       /*
+               xapprt1 := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12011|-1|localhost:13560
+          mse|12012|-1|localhost:13560
+          mse|12020|-1|localhost:14560
+          mse|12021|-1|localhost:13560
+          mse|12022|-1|localhost:13560
+          newrt|end
+          `
+       */
 
        xapprtfilename1, _ := testCreateTmpFile(xapprt1)
        defer os.Remove(xapprtfilename1)
-       xappConn1 = createNewXappControl("xappConn1", xapprtfilename1, "13560", "RMRXAPP1STUB", "RAN_NAME_1")
+       xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB")
 
        //---------------------------------
        //
@@ -320,57 +528,76 @@ newrt|end
 
        xapp.Logger.Info("### xapp2 rmr run ###")
 
-       xapprt2 := `newrt|start
-mse|12010|-1|localhost:14560
-mse|12011|-1|localhost:13660
-mse|12012|-1|localhost:13660
-mse|12020|-1|localhost:14560
-mse|12021|-1|localhost:13660
-mse|12022|-1|localhost:13660
-newrt|end
-`
+       xapprt2 := allrt
+       /*
+               xapprt2 := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12011|-1|localhost:13660
+          mse|12012|-1|localhost:13660
+          mse|12020|-1|localhost:14560
+          mse|12021|-1|localhost:13660
+          mse|12022|-1|localhost:13660
+          newrt|end
+          `
+       */
 
        xapprtfilename2, _ := testCreateTmpFile(xapprt2)
        defer os.Remove(xapprtfilename2)
-       xappConn2 = createNewXappControl("xappConn2", xapprtfilename2, "13660", "RMRXAPP2STUB", "RAN_NAME_1")
+       xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB")
 
        //---------------------------------
        //
        //---------------------------------
        xapp.Logger.Info("### e2term rmr run ###")
 
-       e2termrt := `newrt|start
-mse|12010|-1|localhost:15560
-mse|12011|-1|localhost:14560
-mse|12012|-1|localhost:14560
-mse|12020|-1|localhost:15560
-mse|12021|-1|localhost:14560
-mse|12022|-1|localhost:14560
-newrt|end
-`
+       e2termrt := allrt
+       /*
+               e2termrt := `newrt|start
+          mse|12010|-1|localhost:15560
+          mse|12011|-1|localhost:14560
+          mse|12012|-1|localhost:14560
+          mse|12020|-1|localhost:15560
+          mse|12021|-1|localhost:14560
+          mse|12022|-1|localhost:14560
+          newrt|end
+          `
+       */
 
        e2termrtfilename, _ := testCreateTmpFile(e2termrt)
        defer os.Remove(e2termrtfilename)
-       e2termConn = createNewE2termControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
+       e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
 
        //---------------------------------
-       //
+       // Testing message sending
        //---------------------------------
-       http_handler := func(w http.ResponseWriter, r *http.Request) {
-               var req rtmgr_models.XappSubscriptionData
-               err := json.NewDecoder(r.Body).Decode(&req)
-               if err != nil {
-                       xapp.Logger.Error("%s", err.Error())
-               }
-               xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
+       var dummyBuf []byte = make([]byte, 100)
+
+       params := &RMRParams{&xapp.RMRParams{}}
+       params.Mtype = 55555
+       params.SubId = -1
+       params.Payload = dummyBuf
+       params.PayloadLen = 100
+       params.Meid = &xapp.RMRMeid{RanName: "NONEXISTINGRAN"}
+       params.Xid = "THISISTESTFORSTUBS"
+       params.Mbuf = nil
 
-               w.WriteHeader(200)
+       status := false
+       i := 1
+       for ; i <= 10 && status == false; i++ {
+               xapp.Rmr.Send(params.RMRParams, false)
+               if e2termConn.active == true && xappConn1.active == true && xappConn2.active == true {
+                       status = true
+                       break
+               } else {
+                       xapp.Logger.Info("Sleep 0.5 secs and try routes again")
+                       time.Sleep(500 * time.Millisecond)
+               }
        }
 
-       go func() {
-               http.HandleFunc("/", http_handler)
-               http.ListenAndServe("localhost:8989", nil)
-       }()
+       if status == false {
+               xapp.Logger.Error("Could not initialize routes")
+               os.Exit(1)
+       }
 
        //---------------------------------
        //