-func (tc *testingRmrControl) Lock() {
- tc.mutex.Lock()
-}
-
-func (tc *testingRmrControl) Unlock() {
- tc.mutex.Unlock()
-}
-
-func (tc *testingRmrControl) GetDesc() string {
- return tc.desc
-}
-
-func (tc *testingRmrControl) ReadyCB(data interface{}) {
- xapp.Logger.Info("testingRmrControl(%s) ReadyCB", tc.GetDesc())
- tc.syncChan <- struct{}{}
- return
-}
-
-func (tc *testingRmrControl) WaitCB() {
- <-tc.syncChan
-}
-
-func (tc *testingRmrControl) init(desc string, rtfile string, port string) {
- os.Setenv("RMR_SEED_RT", rtfile)
- os.Setenv("RMR_SRC_ID", "localhost:"+port)
- xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
- xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID"))
- tc.desc = strings.ToUpper(desc)
- tc.syncChan = make(chan struct{})
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type testingRmrStubControl struct {
- testingRmrControl
- rmrConChan chan *RMRParams
- rmrClientTest *xapp.RMRClient
- active bool
- msgCnt uint64
-}
-
-func (tc *testingRmrStubControl) GetMsgCnt() uint64 {
- return tc.msgCnt
-}
-
-func (tc *testingRmrStubControl) IncMsgCnt() {
- tc.msgCnt++
-}
-
-func (tc *testingRmrStubControl) DecMsgCnt() {
- if tc.msgCnt > 0 {
- tc.msgCnt--
- }
-}
-
-func (tc *testingRmrStubControl) TestMsgCnt(t *testing.T) {
- if tc.GetMsgCnt() > 0 {
- testError(t, "(%s) message count expected 0 but is %d", tc.GetDesc(), tc.GetMsgCnt())
- }
-}
-
-func (tc *testingRmrStubControl) RmrSend(params *RMRParams) (err error) {
- //
- //NOTE: Do this way until xapp-frame sending is improved
- //
- xapp.Logger.Info("(%s) RmrSend %s", tc.GetDesc(), params.String())
- status := false
- i := 1
- for ; i <= 10 && status == false; i++ {
- status = tc.rmrClientTest.SendMsg(params.RMRParams)
- if status == false {
- xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.GetDesc(), i, params.String())
- time.Sleep(500 * time.Millisecond)
- }
- }
- if status == false {
- err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.GetDesc(), i, params.String())
- xapp.Rmr.Free(params.Mbuf)
- }
- return
-}
-
-func (tc *testingRmrStubControl) init(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) {
- tc.active = false
- tc.testingRmrControl.init(desc, rtfile, port)
- tc.rmrConChan = make(chan *RMRParams)
- tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
- tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
- go tc.rmrClientTest.Start(consumer)
- tc.WaitCB()
- allRmrStubs = append(allRmrStubs, tc)
-}
-
-var allRmrStubs []*testingRmrStubControl
-