RICPLT-2962 Preparation for subs merge
[ric-plt/submgr.git] / pkg / control / main_test.go
index 3f1284c..dffff77 100644 (file)
 package control
 
 import (
-       "errors"
+       "encoding/json"
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "io/ioutil"
+       "net/http"
        "os"
+       "strconv"
+       "strings"
        "testing"
        "time"
 )
@@ -43,55 +47,169 @@ func (tc *testingControl) ReadyCB(data interface{}) {
        return
 }
 
+func (tc *testingControl) WaitCB() {
+       <-tc.syncChan
+}
+
+func initTestingControl(desc string, rtfile string, port string) testingControl {
+       tc := testingControl{}
+       os.Setenv("RMR_SEED_RT", rtfile)
+       os.Setenv("RMR_SRC_ID", "localhost:"+port)
+       xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+       xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
+       tc.desc = strings.ToUpper(desc)
+       tc.syncChan = make(chan struct{})
+       return tc
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
 type testingRmrControl struct {
        testingControl
        rmrClientTest *xapp.RMRClient
-       rmrConChan    chan *xapp.RMRParams
 }
 
-func (tc *testingRmrControl) Consume(msg *xapp.RMRParams) (err error) {
-       xapp.Logger.Info("testingRmrControl(%s) Consume", tc.desc)
-       tc.rmrConChan <- msg
-       return
-}
-
-func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
+func (tc *testingRmrControl) RmrSend(params *RMRParams) (err error) {
        //
        //NOTE: Do this way until xapp-frame sending is improved
        //
+       xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String())
        status := false
        i := 1
        for ; i <= 10 && status == false; i++ {
-               status = tc.rmrClientTest.SendMsg(params)
+               status = tc.rmrClientTest.SendMsg(params.RMRParams)
                if status == false {
-                       xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
+                       xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
                        time.Sleep(500 * time.Millisecond)
                }
        }
        if status == false {
-               err = errors.New("rmr.Send() failed")
-               tc.rmrClientTest.Free(params.Mbuf)
+               err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
+               xapp.Rmr.Free(params.Mbuf)
        }
        return
 }
 
-func createNewRmrControl(desc string, rtfile string, port string, stat string) *testingRmrControl {
-       os.Setenv("RMR_SEED_RT", rtfile)
-       os.Setenv("RMR_SRC_ID", "localhost:"+port)
-       xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
-       xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
-       newConn := &testingRmrControl{}
-       newConn.desc = desc
-       newConn.syncChan = make(chan struct{})
-       newConn.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
-       newConn.rmrConChan = make(chan *xapp.RMRParams)
-       newConn.rmrClientTest.SetReadyCB(newConn.ReadyCB, nil)
-       go newConn.rmrClientTest.Start(newConn)
-       <-newConn.syncChan
-       return newConn
+func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
+       tc := testingRmrControl{}
+       tc.testingControl = initTestingControl(desc, rtfile, port)
+       tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
+       tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
+       go tc.rmrClientTest.Start(consumer)
+       tc.WaitCB()
+       return tc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingMessageChannel struct {
+       rmrConChan chan *RMRParams
+}
+
+func initTestingMessageChannel() testingMessageChannel {
+       mc := testingMessageChannel{}
+       mc.rmrConChan = make(chan *RMRParams)
+       return mc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type xappTransaction struct {
+       tc   *testingXappControl
+       xid  string
+       meid *xapp.RMRMeid
+}
+
+type testingXappControl struct {
+       testingRmrControl
+       testingMessageChannel
+       xid_seq uint64
+}
+
+func (tc *testingXappControl) newXid() string {
+       var xid string
+       xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
+       tc.xid_seq++
+       return xid
+}
+
+func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
+       trans := &xappTransaction{}
+       trans.tc = tc
+       if xid == nil {
+               trans.xid = tc.newXid()
+       } else {
+               trans.xid = *xid
+       }
+       trans.meid = &xapp.RMRMeid{RanName: ranname}
+       return trans
+}
+
+func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+       msg := &RMRParams{params}
+
+       if strings.Contains(msg.Xid, tc.desc) {
+               xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
+               tc.rmrConChan <- msg
+       } else {
+               xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String())
+       }
+       return
+}
+
+func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl {
+       xappCtrl := &testingXappControl{}
+       xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
+       xappCtrl.testingMessageChannel = initTestingMessageChannel()
+       xappCtrl.xid_seq = 1
+       return xappCtrl
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingE2termControl struct {
+       testingRmrControl
+       testingMessageChannel
+}
+
+func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+       msg := &RMRParams{params}
+       xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
+       tc.rmrConChan <- msg
+       return
+}
+
+func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
+       e2termCtrl := &testingE2termControl{}
+       e2termCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
+       e2termCtrl.testingMessageChannel = initTestingMessageChannel()
+       return e2termCtrl
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingMainControl struct {
+       testingControl
+       c *Control
+}
+
+func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
+       mainCtrl = &testingMainControl{}
+       mainCtrl.testingControl = initTestingControl(desc, rtfile, port)
+       mainCtrl.c = NewControl()
+       xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
+       go xapp.RunWithParams(mainCtrl.c, false)
+       mainCtrl.WaitCB()
+       return mainCtrl
 }
 
 //-----------------------------------------------------------------------------
@@ -103,6 +221,11 @@ func testError(t *testing.T, pattern string, args ...interface{}) {
        t.Errorf(fmt.Sprintf(pattern, args...))
 }
 
+func testLog(t *testing.T, pattern string, args ...interface{}) {
+       xapp.Logger.Info(fmt.Sprintf(pattern, args...))
+       t.Logf(fmt.Sprintf(pattern, args...))
+}
+
 func testCreateTmpFile(str string) (string, error) {
        file, err := ioutil.TempFile("/tmp", "*.rt")
        if err != nil {
@@ -120,118 +243,208 @@ func testCreateTmpFile(str string) (string, error) {
 //
 //-----------------------------------------------------------------------------
 
-var xappConn *testingRmrControl
-var e2termConn *testingRmrControl
+var xappConn1 *testingXappControl
+var xappConn2 *testingXappControl
+var e2termConn *testingE2termControl
+var mainCtrl *testingMainControl
 
 func TestMain(m *testing.M) {
        xapp.Logger.Info("TestMain start")
 
+       //---------------------------------
+       //
+       //---------------------------------
+       http_handler := func(w http.ResponseWriter, r *http.Request) {
+               var req rtmgr_models.XappSubscriptionData
+               err := json.NewDecoder(r.Body).Decode(&req)
+               if err != nil {
+                       xapp.Logger.Error("%s", err.Error())
+               }
+               xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
+
+               w.WriteHeader(200)
+       }
+
+       go func() {
+               http.HandleFunc("/", http_handler)
+               http.ListenAndServe("localhost:8989", nil)
+       }()
+
+       //---------------------------------
+       //
+       //---------------------------------
+
        //
        //Cfg creation won't work like this as xapp-frame reads it during init.
        //
        /*
-            cfgstr:=`{
-              "local": {
-                  "host": ":8080"
-              },
-              "logger": {
-                  "level": 4
-              },
-              "rmr": {
-                 "protPort": "tcp:14560",
-                 "maxSize": 4096,
-                 "numWorkers": 1,
-                 "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
-                 "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
-              },
-              "db": {
-                  "host": "localhost",
-                  "port": 6379,
-                  "namespaces": ["sdl", "rnib"]
-              }
-          }`
-
-            cfgfilename,_ := testCreateTmpFile(cfgstr)
-            defer os.Remove(cfgfilename)
-            os.Setenv("CFG_FILE", cfgfilename)
+           cfgstr:=`{
+             "local": {
+                 "host": ":8080"
+             },
+             "logger": {
+                 "level": 4
+             },
+             "rmr": {
+                "protPort": "tcp:14560",
+                "maxSize": 4096,
+                "numWorkers": 1,
+                "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
+                "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
+             },
+             "db": {
+                 "host": "localhost",
+                 "port": 6379,
+                 "namespaces": ["sdl", "rnib"]
+             },
+                "rtmgr" : {
+                  "HostAddr" : "localhost",
+                  "port" : "8989",
+                  "baseUrl" : "/"
+                }
+          `
+
+          cfgfilename,_ := testCreateTmpFile(cfgstr)
+          defer os.Remove(cfgfilename)
+          os.Setenv("CFG_FILE", cfgfilename)
        */
        xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
 
        //---------------------------------
+       // Static routetable for rmr
+       //
+       // NOTE: Routing table is configured so, that responses
+       //       are duplicated to xapp1 and xapp2 instances.
+       //       If XID is not matching xapp stub will just
+       //       drop message. (Messages 12011, 12012, 12021, 12022)
+       //
+       // 14560 submgr
+       // 15560 e2term stub
+       // 13560 xapp1 stub
+       // 13660 xapp2 stub
        //
        //---------------------------------
-       xapp.Logger.Info("### submgr main run ###")
 
-       subsrt := `newrt|start
+       allrt := `newrt|start
 mse|12010|-1|localhost:14560
 mse|12010,localhost:14560|-1|localhost:15560
 mse|12011,localhost:15560|-1|localhost:14560
-mse|12011|-1|localhost:13560
 mse|12012,localhost:15560|-1|localhost:14560
-mse|12012|-1|localhost:13560
+mse|12011,localhost:14560|-1|localhost:13660;localhost:13560
+mse|12012,localhost:14560|-1|localhost:13660;localhost:13560
 mse|12020|-1|localhost:14560
 mse|12020,localhost:14560|-1|localhost:15560
 mse|12021,localhost:15560|-1|localhost:14560
-mse|12021|-1|localhost:13560
 mse|12022,localhost:15560|-1|localhost:14560
-mse|12022|-1|localhost:13560
+mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
+mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
 newrt|end
 `
 
+       //---------------------------------
+       //
+       //---------------------------------
+       xapp.Logger.Info("### submgr main run ###")
+
+       subsrt := allrt
+       /*
+               subsrt := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12010,localhost:14560|-1|localhost:15560
+          mse|12011,localhost:15560|-1|localhost:14560
+          mse|12011|-1|localhost:13560;localhost:13660
+          mse|12012,localhost:15560|-1|localhost:14560
+          mse|12012|-1|localhost:13560;localhost:13660
+          mse|12020|-1|localhost:14560
+          mse|12020,localhost:14560|-1|localhost:15560
+          mse|12021,localhost:15560|-1|localhost:14560
+          mse|12021|-1|localhost:13560;localhost:13660
+          mse|12022,localhost:15560|-1|localhost:14560
+          mse|12022|-1|localhost:13560;localhost:13660
+          newrt|end
+          `
+       */
+
        subrtfilename, _ := testCreateTmpFile(subsrt)
        defer os.Remove(subrtfilename)
-       os.Setenv("RMR_SEED_RT", subrtfilename)
-       xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+       mainCtrl = createNewMainControl("main", subrtfilename, "14560")
+
+       //---------------------------------
+       //
+       //---------------------------------
+       xapp.Logger.Info("### xapp1 rmr run ###")
 
-       mainCtrl := &testingControl{}
-       mainCtrl.desc = "main"
-       mainCtrl.syncChan = make(chan struct{})
+       xapprt1 := allrt
+       /*
+               xapprt1 := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12011|-1|localhost:13560
+          mse|12012|-1|localhost:13560
+          mse|12020|-1|localhost:14560
+          mse|12021|-1|localhost:13560
+          mse|12022|-1|localhost:13560
+          newrt|end
+          `
+       */
 
-       os.Setenv("RMR_SRC_ID", "localhost:14560")
-       c := NewControl()
-       c.skipRouteUpdate = true
-       xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
-       go xapp.RunWithParams(c, false)
-       <-mainCtrl.syncChan
+       xapprtfilename1, _ := testCreateTmpFile(xapprt1)
+       defer os.Remove(xapprtfilename1)
+       xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB")
 
        //---------------------------------
        //
        //---------------------------------
-       xapp.Logger.Info("### xapp rmr run ###")
 
-       xapprt := `newrt|start
-mse|12010|-1|localhost:14560
-mse|12011|-1|localhost:13560
-mse|12012|-1|localhost:13560
-mse|12020|-1|localhost:14560
-mse|12021|-1|localhost:13560
-mse|12022|-1|localhost:13560
-newrt|end
-`
+       xapp.Logger.Info("### xapp2 rmr run ###")
+
+       xapprt2 := allrt
+       /*
+               xapprt2 := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12011|-1|localhost:13660
+          mse|12012|-1|localhost:13660
+          mse|12020|-1|localhost:14560
+          mse|12021|-1|localhost:13660
+          mse|12022|-1|localhost:13660
+          newrt|end
+          `
+       */
 
-       xapprtfilename, _ := testCreateTmpFile(xapprt)
-       defer os.Remove(xapprtfilename)
-       xappConn = createNewRmrControl("xappConn", xapprtfilename, "13560", "RMRXAPPSTUB")
+       xapprtfilename2, _ := testCreateTmpFile(xapprt2)
+       defer os.Remove(xapprtfilename2)
+       xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB")
 
        //---------------------------------
        //
        //---------------------------------
        xapp.Logger.Info("### e2term rmr run ###")
 
-       e2termrt := `newrt|start
-mse|12010|-1|localhost:15560
-mse|12011|-1|localhost:14560
-mse|12012|-1|localhost:14560
-mse|12020|-1|localhost:15560
-mse|12021|-1|localhost:14560
-mse|12022|-1|localhost:14560
-newrt|end
-`
+       e2termrt := allrt
+       /*
+               e2termrt := `newrt|start
+          mse|12010|-1|localhost:15560
+          mse|12011|-1|localhost:14560
+          mse|12012|-1|localhost:14560
+          mse|12020|-1|localhost:15560
+          mse|12021|-1|localhost:14560
+          mse|12022|-1|localhost:14560
+          newrt|end
+          `
+       */
 
        e2termrtfilename, _ := testCreateTmpFile(e2termrt)
        defer os.Remove(e2termrtfilename)
-       e2termConn = createNewRmrControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
+       e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
+
+       //---------------------------------
+       // Stupid sleep to try improve robustness
+       // due: http handler and rmr routes init delays
+       //---------------------------------
+       <-time.After(2 * time.Second)
 
+       //---------------------------------
+       //
+       //---------------------------------
        code := m.Run()
        os.Exit(code)
 }