import (
"fmt"
+ "sync"
+ "time"
+
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- "sync"
- "time"
)
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
+type RESTSubscription struct {
+ xAppRmrEndPoint string
+ Meid string
+ InstanceIds []uint32
+ SubReqOngoing bool
+ SubDelReqOngoing bool
+}
+
+func (r *RESTSubscription) AddInstanceId(instanceId uint32) {
+ r.InstanceIds = append(r.InstanceIds, instanceId)
+}
+
+func (r *RESTSubscription) SetProcessed() {
+ r.SubReqOngoing = false
+}
+
+func (r *RESTSubscription) DeleteInstanceId(instanceId uint32) {
+ r.InstanceIds = r.InstanceIds[1:]
+}
+
type Registry struct {
- mutex sync.Mutex
- register map[uint32]*Subscription
- subIds []uint32
- rtmgrClient *RtmgrClient
+ mutex sync.Mutex
+ register map[uint32]*Subscription
+ subIds []uint32
+ rtmgrClient *RtmgrClient
+ restSubscriptions map[string]*RESTSubscription
}
func (r *Registry) Initialize() {
r.register = make(map[uint32]*Subscription)
+ r.restSubscriptions = make(map[string]*RESTSubscription)
+
var i uint32
for i = 1; i < 65535; i++ {
r.subIds = append(r.subIds, i)
}
}
+func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ newRestSubscription := RESTSubscription{}
+ newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
+ newRestSubscription.Meid = *maid
+ newRestSubscription.SubReqOngoing = true
+ newRestSubscription.SubDelReqOngoing = false
+ r.restSubscriptions[*restSubId] = &newRestSubscription
+ xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
+ return &newRestSubscription, nil
+}
+
+func (r *Registry) DeleteRESTSubscription(restSubId *string) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ delete(r.restSubscriptions, *restSubId)
+ xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
+}
+
+func (r *Registry) GetRESTSubscription(restSubId string) (*RESTSubscription, error) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
+ // Subscription deletion is not allowed if prosessing subscription request in not ready
+ if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
+ restSubscription.SubDelReqOngoing = true
+ r.restSubscriptions[restSubId] = restSubscription
+ return restSubscription, nil
+ } else {
+ return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
+ }
+ return restSubscription, nil
+ }
+ return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
+}
+
func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
} else if endPointFound == true {
// Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
subs.RetryFromXapp = true
- xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
+ xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
+ c.UpdateCounter(cDuplicateE2SubReq)
return subs, nil
}
defer subs.mutex.Unlock()
epamount := subs.EpList.Size()
- xapp.Logger.Info("AssignToSubscription subs.EpList.Size() = %v", subs.EpList.Size())
+ xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
r.mutex.Unlock()
//
go func() {
if waitRouteClean > 0 {
+ xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
time.Sleep(waitRouteClean)
}