package control
import (
+ "fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"sync"
+ "time"
)
-type Subscription struct {
- Seq uint16
- Confirmed bool
-}
-
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
type Registry struct {
- register map[uint16]*Subscription
- counter uint16
- mutex sync.Mutex
+ mutex sync.Mutex
+ register map[uint16]*Subscription
+ subIds []uint16
+ rtmgrClient *RtmgrClient
}
-// This method should run as a constructor
-func (r *Registry) Initialize(seedsn uint16) {
+func (r *Registry) Initialize() {
r.register = make(map[uint16]*Subscription)
- r.counter = seedsn
+ var i uint16
+ for i = 0; i < 65535; i++ {
+ r.subIds = append(r.subIds, i+1)
+ }
}
-// Reserves and returns the next free sequence number
-func (r *Registry) ReserveSequenceNumber() (uint16, bool) {
- // Check is current SequenceNumber valid
+func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
- sequenceNumber := r.counter
- if _, ok := r.register[sequenceNumber]; ok {
- xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber)
- return sequenceNumber, false
+
+ var sequenceNumber uint16
+
+ //
+ // Allocate subscription
+ //
+ if len(r.subIds) > 0 {
+ sequenceNumber = r.subIds[0]
+ r.subIds = r.subIds[1:]
+ if _, ok := r.register[sequenceNumber]; ok == true {
+ r.subIds = append(r.subIds, sequenceNumber)
+ return nil, fmt.Errorf("Registry: Failed to reserves subscription")
+ }
+ } else {
+ return nil, fmt.Errorf("Registry: Failed to reserves subscription no free ids")
}
- r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
+ subs := &Subscription{
+ registry: r,
+ Seq: sequenceNumber,
+ Meid: trans.Meid,
+ }
+
+ //
+ // Add to subscription
+ //
+ subs.mutex.Lock()
+ defer subs.mutex.Unlock()
- // Allocate next SequenceNumber value
- if r.counter == 65535 {
- r.counter = 0
+ if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
+ r.subIds = append(r.subIds, sequenceNumber)
+ return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
+ }
+ epamount := subs.EpList.Size()
+
+ r.mutex.Unlock()
+ //
+ // Subscription route updates
+ //
+ var err error
+ if epamount == 1 {
+ subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
+ err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
} else {
- r.counter++
+ subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
+ err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
}
- return sequenceNumber, true
+ r.mutex.Lock()
+
+ if err != nil {
+ r.subIds = append(r.subIds, sequenceNumber)
+ return nil, err
+ }
+ subs.SubReqMsg = subReqMsg
+
+ r.register[sequenceNumber] = subs
+ xapp.Logger.Debug("Registry: Create %s", subs.String())
+ xapp.Logger.Debug("Registry: substable=%v", r.register)
+ return subs, nil
}
-// This function checks the validity of the given subscription id
-func (r *Registry) IsValidSequenceNumber(sn uint16) bool {
+func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
r.mutex.Lock()
defer r.mutex.Unlock()
- xapp.Logger.Debug("Registry map: %v", r.register)
- if _, ok := r.register[sn]; ok {
- return true
+ subs.mutex.Lock()
+ defer subs.mutex.Unlock()
+
+ delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
+ epamount := subs.EpList.Size()
+
+ //
+ // If last endpoint remove from register map
+ //
+ if epamount == 0 {
+ if _, ok := r.register[subs.Seq]; ok {
+ xapp.Logger.Debug("Registry: Delete %s", subs.String())
+ delete(r.register, subs.Seq)
+ xapp.Logger.Debug("Registry: substable=%v", r.register)
+ }
+ }
+ r.mutex.Unlock()
+
+ //
+ // Wait some time before really do route updates
+ //
+ if waitRouteClean > 0 {
+ subs.mutex.Unlock()
+ time.Sleep(waitRouteClean)
+ subs.mutex.Lock()
+ }
+
+ xapp.Logger.Info("Registry: Cleaning %s", subs.String())
+
+ //
+ // Subscription route updates
+ //
+ if delStatus {
+ if epamount == 0 {
+ tmpList := RmrEndpointList{}
+ tmpList.AddEndpoint(trans.GetEndpoint())
+ subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
+ r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ } else {
+ subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
+ r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ }
}
- return false
-}
-// This function sets the give id as confirmed in the register
-func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
r.mutex.Lock()
- defer r.mutex.Unlock()
- r.register[sn].Confirmed = true
+ //
+ // If last endpoint free seq nro
+ //
+ if epamount == 0 {
+ r.subIds = append(r.subIds, subs.Seq)
+ }
+
+ return nil
}
-//This function sets the given id as unused in the register
-func (r *Registry) deleteSubscription(sn uint16) {
+func (r *Registry) GetSubscription(sn uint16) *Subscription {
r.mutex.Lock()
defer r.mutex.Unlock()
- r.register[sn].Confirmed = false
+ if _, ok := r.register[sn]; ok {
+ return r.register[sn]
+ }
+ return nil
}
-//This function releases the given id as unused in the register
-func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
- if _, ok := r.register[sn]; ok {
- delete(r.register, sn)
- return true
- } else {
- return false
+ for _, id := range ids {
+ if _, ok := r.register[id]; ok {
+ return r.register[id], nil
+ }
}
+ return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
}