2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
30 //-----------------------------------------------------------------------------
32 //-----------------------------------------------------------------------------
34 type Registry struct {
36 register map[uint32]*Subscription
38 rtmgrClient *RtmgrClient
41 func (r *Registry) Initialize() {
42 r.register = make(map[uint32]*Subscription)
44 for i = 0; i < 65535; i++ {
45 r.subIds = append(r.subIds, i+1)
49 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
50 if len(r.subIds) > 0 {
51 sequenceNumber := r.subIds[0]
52 r.subIds = r.subIds[1:]
53 if _, ok := r.register[sequenceNumber]; ok == true {
54 r.subIds = append(r.subIds, sequenceNumber)
55 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
57 subs := &Subscription{
64 subs.ReqId.Seq = sequenceNumber
66 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
67 r.subIds = append(r.subIds, subs.ReqId.Seq)
68 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
73 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
76 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
78 for _, subs := range r.register {
79 if subs.IsMergeable(trans, subReqMsg) {
82 // check if there has been race conditions
85 //subs has been set to invalid
86 if subs.valid == false {
90 // If size is zero, entry is to be deleted
91 if subs.EpList.Size() == 0 {
95 // try to add to endpointlist.
96 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
102 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
109 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
113 defer r.mutex.Unlock()
116 // Check validity of subscription action types
118 actionType, err := r.CheckActionTypes(subReqMsg)
120 xapp.Logger.Debug("CREATE %s", err)
125 // Find possible existing Policy subscription
127 if actionType == e2ap.E2AP_ActionTypePolicy {
128 if subs, ok := r.register[subReqMsg.RequestId.Seq]; ok {
129 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
130 subs.SetCachedResponse(nil, true)
135 subs := r.findExistingSubs(trans, subReqMsg)
137 subs, err = r.allocateSubs(trans, subReqMsg)
145 // Add to subscription
148 defer subs.mutex.Unlock()
150 epamount := subs.EpList.Size()
154 // Subscription route updates
157 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
158 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
160 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
161 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
167 r.subIds = append(r.subIds, subs.ReqId.Seq)
173 r.register[subs.ReqId.Seq] = subs
175 xapp.Logger.Debug("CREATE %s", subs.String())
176 xapp.Logger.Debug("Registry: substable=%v", r.register)
180 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
181 var reportFound bool = false
182 var policyFound bool = false
184 for _, acts := range subReqMsg.ActionSetups {
185 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
188 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
192 if reportFound == true && policyFound == true {
193 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
195 if reportFound == true {
196 return e2ap.E2AP_ActionTypeReport, nil
198 if policyFound == true {
199 return e2ap.E2AP_ActionTypePolicy, nil
201 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
204 // TODO: Works with concurrent calls, but check if can be improved
205 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
208 defer r.mutex.Unlock()
210 defer subs.mutex.Unlock()
212 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
213 epamount := subs.EpList.Size()
214 seqId := subs.ReqId.Seq
216 if delStatus == false {
221 if waitRouteClean > 0 {
222 time.Sleep(waitRouteClean)
226 defer subs.mutex.Unlock()
227 xapp.Logger.Info("CLEAN %s", subs.String())
231 // Subscription route delete
233 tmpList := RmrEndpointList{}
234 tmpList.AddEndpoint(trans.GetEndpoint())
235 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
236 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
239 // Subscription release
242 defer r.mutex.Unlock()
244 if _, ok := r.register[seqId]; ok {
245 xapp.Logger.Debug("RELEASE %s", subs.String())
246 delete(r.register, seqId)
247 xapp.Logger.Debug("Registry: substable=%v", r.register)
249 r.subIds = append(r.subIds, seqId)
251 } else if subs.EpList.Size() > 0 {
253 // Subscription route updates
255 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
256 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
264 func (r *Registry) GetSubscription(sn uint32) *Subscription {
266 defer r.mutex.Unlock()
267 if _, ok := r.register[sn]; ok {
268 return r.register[sn]
273 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
275 defer r.mutex.Unlock()
276 for _, id := range ids {
277 if _, ok := r.register[id]; ok {
278 return r.register[id], nil
281 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)