var e2tSubReqTimeout time.Duration
var e2tSubDelReqTime time.Duration
var e2tRecvMsgTimeout time.Duration
+var waitRouteCleanup_ms time.Duration
var e2tMaxSubReqTryCount uint64 // Initial try + retry
var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
var readSubsFromDb string
e2tRecvMsgTimeout = 2000 * 1000000
}
xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
+
+ // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
+ // value 100ms used currently only in unittests.
+ waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
+ if waitRouteCleanup_ms == 0 {
+ waitRouteCleanup_ms = 5000 * 1000000
+ }
+ xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
+
e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
if e2tMaxSubReqTryCount == 0 {
e2tMaxSubReqTryCount = 1
}
xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
+
e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
if e2tMaxSubDelReqTryCount == 0 {
e2tMaxSubDelReqTryCount = 1
}
err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
xapp.Logger.Error("%s", err.Error())
- c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
+ c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
return nil, err
}
xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
- c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
+ c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
return nil
}
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
if valid == false {
- c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
+ c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
}
c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
// If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
// RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
- c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
+ c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
c.registry.UpdateSubscriptionToDb(subs, c)
parentTrans.SendEvent(nil, 0)
}
// Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
c.WriteSubscriptionToDb(subs)
+
for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
desc := fmt.Sprintf("(retry %d)", retries)
if retries == 0 {
go func() {
if waitRouteClean > 0 {
+ xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
time.Sleep(waitRouteClean)
}
import (
"encoding/json"
"fmt"
- "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
- "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststube2ap"
- "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- "github.com/stretchr/testify/assert"
"reflect"
"strconv"
"strings"
+ "sync"
"testing"
"time"
+
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststube2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "github.com/stretchr/testify/assert"
)
var sdlShouldReturnError bool = false
register map[uint32]*Subscription
subIds []uint32
lastAllocatedSubId uint32
+ marshalLock sync.Mutex
}
var mock *Mock
var key string
var val string
+ m.marshalLock.Lock()
+ defer m.marshalLock.Unlock()
+
if sdlShouldReturnError == true {
return GetSdlError()
}
func (mc *testingSubmgrControl) wait_registry_empty(t *testing.T, secs int) bool {
cnt := int(0)
i := 1
- for ; i <= secs*2; i++ {
+ for ; i <= secs*10; i++ {
cnt = len(mc.c.registry.register)
if cnt == 0 {
return true
}
- time.Sleep(500 * time.Millisecond)
+ time.Sleep(100 * time.Millisecond)
}
mc.TestError(t, "(submgr) no registry empty within %d secs: %d", secs, cnt)
return false
func (mc *testingSubmgrControl) wait_registry_next_subid_change(t *testing.T, origSubId uint32, secs int) (uint32, bool) {
i := 1
- for ; i <= secs*2; i++ {
+ for ; i <= secs*10; i++ {
mc.c.registry.mutex.Lock()
currSubId := mc.c.registry.subIds[0]
mc.c.registry.mutex.Unlock()
if currSubId != origSubId {
return currSubId, true
}
- time.Sleep(500 * time.Millisecond)
+ time.Sleep(100 * time.Millisecond)
}
mc.TestError(t, "(submgr) no subId change within %d secs", secs)
return 0, false
func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId uint32, secs int) bool {
var subs *Subscription
i := 1
- for ; i <= secs*2; i++ {
+ for ; i <= secs*10; i++ {
subs = mc.c.registry.GetSubscription(e2SubsId)
if subs == nil {
return true
}
- time.Sleep(500 * time.Millisecond)
+ time.Sleep(100 * time.Millisecond)
}
if subs != nil {
mc.TestError(t, "(submgr) no clean within %d secs: %s", secs, subs.String())
}
func (mc *testingSubmgrControl) wait_multi_subs_clean(t *testing.T, e2SubsIds []uint32, secs int) bool {
- var subs *Subscription
- var purgedSubscriptions int
- i := 1
- k := 0
- for ; i <= secs*2; i++ {
+
+ purgedSubscriptions := 0
+
+ for i := 1; i <= secs*10; i++ {
purgedSubscriptions = 0
- for k = 0; k <= len(e2SubsIds); i++ {
- subs = mc.c.registry.GetSubscription(e2SubsIds[k])
+ for k := 0; k <= len(e2SubsIds); i++ {
+ subs := mc.c.registry.GetSubscription(e2SubsIds[k])
if subs == nil {
mc.TestLog(t, "(submgr) subscriber purged for esSubsId %v", e2SubsIds[k])
purgedSubscriptions += 1
if purgedSubscriptions == len(e2SubsIds) {
return true
- } else {
- continue
}
- } else {
- mc.TestLog(t, "(submgr) subscriber %s no clean within %d secs: subs(N/A) - purged subscriptions %v", subs.String(), secs, purgedSubscriptions)
- time.Sleep(500 * time.Millisecond)
}
}
+ mc.TestLog(t, "(submgr) subscriptions pending purging %v/%v after %d msecs", purgedSubscriptions, len(e2SubsIds), i+500)
+ time.Sleep(100 * time.Millisecond)
}
mc.TestError(t, "(submgr) no clean within %d secs: subs(N/A) - %v/%v subscriptions found still", secs, purgedSubscriptions, len(e2SubsIds))
func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId uint32, secs int) bool {
var trans TransactionIf
i := 1
- for ; i <= secs*2; i++ {
+ for ; i <= secs*10; i++ {
subs := mc.c.registry.GetSubscription(e2SubsId)
if subs == nil {
return true
if trans == nil {
return true
}
- time.Sleep(500 * time.Millisecond)
+ time.Sleep(100 * time.Millisecond)
}
if trans != nil {
mc.TestError(t, "(submgr) no clean within %d secs: %s", secs, trans.String())
}
i := 1
- for ; i <= secs*2; i++ {
+ for ; i <= secs*10; i++ {
curr := subs.EpList.Size()
if curr != orig {
return curr, true
}
- time.Sleep(500 * time.Millisecond)
+ time.Sleep(100 * time.Millisecond)
}
mc.TestError(t, "(submgr) no subs %d entrypoint cnt change within %d secs", origSubId, secs)
return 0, false
func (mc *testingSubmgrControl) wait_msgcounter_change(t *testing.T, orig uint64, secs int) (uint64, bool) {
i := 1
- for ; i <= secs*2; i++ {
+ for ; i <= secs*10; i++ {
curr := mc.c.CntRecvMsg
if curr != orig {
return curr, true
}
- time.Sleep(500 * time.Millisecond)
+ time.Sleep(100 * time.Millisecond)
}
mc.TestError(t, "(submgr) no msg counter change within %d secs", secs)
return 0, false
// mainCtrl.SimulateRestart(t)
// xapp.Logger.Debug("mainCtrl.SimulateRestart done")
- // Delete subscription 1
+ // Delete subscription 1, and wait until it has removed the first endpoint
+ subepcnt := mainCtrl.get_subs_entrypoint_cnt(t, e2SubsId1)
xappConn1.SendRESTSubsDelReq(t, &restSubId1)
+ mainCtrl.wait_subs_entrypoint_cnt_change(t, e2SubsId1, subepcnt, 10)
// When SDL support for the REST Interface is added
// the submgr restart statement below should be removed
// mainCtrl.SimulateRestart(t)
// xapp.Logger.Debug("mainCtrl.SimulateRestart done")
-
queryXappSubscription(t, int64(e2SubsId1), "RAN_NAME_1", []string{"localhost:13660"})
// Delete subscription 2
mainCtrl.VerifyCounterValues(t)
}
-/*
func TestRESTTwoReportSubReqAndSubDelOkNoActParams(t *testing.T) {
subReqCount := 2
mainCtrl.VerifyCounterValues(t)
}
-*/
-/*
+
func TestRESTFullAmountReportSubReqAndSubDelOk(t *testing.T) {
subReqCount := 19
mainCtrl.VerifyCounterValues(t)
}
-*/
func TestRESTSubReqReportSameActionDiffEventTriggerDefinitionLen(t *testing.T) {
CaseBegin("TestRESTSubReqReportSameActionDiffEventTriggerDefinitionLen")
// Callback handler for subscription response notifications
//-----------------------------------------------------------------------------
func (tc *E2Stub) SubscriptionRespHandler(resp *clientmodel.SubscriptionResponse) {
- if tc.subscriptionId == *resp.SubscriptionID {
+ if tc.subscriptionId == "SUBSCRIPTIONID NOT SET" {
+ tc.Info("REST notification received for %v while no SubscriptionID was not set for InstanceID=%v, RequestorID=%v (%v)",
+ *resp.SubscriptionID, *resp.SubscriptionInstances[0].InstanceID, *resp.SubscriptionInstances[0].RequestorID, tc)
+ tc.CallBackNotification <- *resp.SubscriptionInstances[0].InstanceID
+ } else if tc.subscriptionId == *resp.SubscriptionID {
tc.Info("REST notification received SubscriptionID=%s, InstanceID=%v, RequestorID=%v (%v)",
*resp.SubscriptionID, *resp.SubscriptionInstances[0].InstanceID, *resp.SubscriptionInstances[0].RequestorID, tc)
tc.CallBackNotification <- *resp.SubscriptionInstances[0].InstanceID
return ""
}
+ tc.subscriptionId = "SUBSCIPTIONID NOT SET"
+
resp, err := xapp.Subscription.Subscribe(¶ms.SubsReqParams)
if err != nil {
// Swagger generated code makes checks for the values that are inserted the subscription function
"e2tRecvMsgTimeout_ms": 2000,
"e2tMaxSubReqTryCount": 2,
"e2tMaxSubDelReqTryCount": 2,
+ "waitRouteCleanup_ms": 100,
"readSubsFromDb": "true",
"subscription": {
"host": "localhost:8088",