2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 //-----------------------------------------------------------------------------
33 //-----------------------------------------------------------------------------
35 type Registry struct {
37 register map[uint32]*Subscription
39 rtmgrClient *RtmgrClient
42 func (r *Registry) Initialize() {
43 r.register = make(map[uint32]*Subscription)
45 for i = 0; i < 65535; i++ {
46 r.subIds = append(r.subIds, i+1)
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52 defer r.mutex.Unlock()
54 resp := models.SubscriptionList{}
55 for _, subs := range r.register {
57 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64 if len(r.subIds) > 0 {
66 r.subIds = r.subIds[1:]
67 if _, ok := r.register[subId]; ok == true {
68 r.subIds = append(r.subIds, subId)
69 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71 subs := &Subscription{
78 subs.ReqId.InstanceId = subId
80 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
82 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
87 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
92 for _, subs := range r.register {
93 if subs.IsMergeable(trans, subReqMsg) {
96 // check if there has been race conditions
99 //subs has been set to invalid
100 if subs.valid == false {
104 // If size is zero, entry is to be deleted
105 if subs.EpList.Size() == 0 {
109 // Try to add to endpointlist. Adding fails if endpoint is already in the list
110 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
112 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
117 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
124 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
128 defer r.mutex.Unlock()
131 // Check validity of subscription action types
133 actionType, err := r.CheckActionTypes(subReqMsg)
135 xapp.Logger.Debug("CREATE %s", err)
140 // Find possible existing Policy subscription
142 if actionType == e2ap.E2AP_ActionTypePolicy {
143 if subs, ok := r.register[trans.GetSubId()]; ok {
144 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
145 // Update message data to subscription
146 subs.SubReqMsg = subReqMsg
147 subs.SetCachedResponse(nil, true)
152 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
154 subs, err = r.allocateSubs(trans, subReqMsg)
159 } else if endPointFound == true {
160 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
161 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
162 xapp.Logger.Debug("Registry: substable=%v", r.register)
167 // Add to subscription
170 defer subs.mutex.Unlock()
172 epamount := subs.EpList.Size()
176 // Subscription route updates
179 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
180 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
182 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
183 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
189 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
195 r.register[subs.ReqId.InstanceId] = subs
197 xapp.Logger.Debug("CREATE %s", subs.String())
198 xapp.Logger.Debug("Registry: substable=%v", r.register)
202 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
203 var reportFound bool = false
204 var policyFound bool = false
205 var insertFound bool = false
207 for _, acts := range subReqMsg.ActionSetups {
208 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
211 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
214 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
218 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
219 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
221 if reportFound == true {
222 return e2ap.E2AP_ActionTypeReport, nil
224 if policyFound == true {
225 return e2ap.E2AP_ActionTypePolicy, nil
227 if insertFound == true {
228 return e2ap.E2AP_ActionTypeInsert, nil
230 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
233 // TODO: Works with concurrent calls, but check if can be improved
234 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
237 defer r.mutex.Unlock()
239 defer subs.mutex.Unlock()
241 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
242 epamount := subs.EpList.Size()
243 subId := subs.ReqId.InstanceId
245 if delStatus == false {
250 if waitRouteClean > 0 {
251 time.Sleep(waitRouteClean)
255 defer subs.mutex.Unlock()
256 xapp.Logger.Info("CLEAN %s", subs.String())
260 // Subscription route delete
262 tmpList := xapp.RmrEndpointList{}
263 tmpList.AddEndpoint(trans.GetEndpoint())
264 subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
265 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
268 // Subscription release
271 defer r.mutex.Unlock()
273 if _, ok := r.register[subId]; ok {
274 xapp.Logger.Debug("RELEASE %s", subs.String())
275 delete(r.register, subId)
276 xapp.Logger.Debug("Registry: substable=%v", r.register)
278 r.subIds = append(r.subIds, subId)
280 } else if subs.EpList.Size() > 0 {
282 // Subscription route updates
284 subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
285 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
293 func (r *Registry) GetSubscription(subId uint32) *Subscription {
295 defer r.mutex.Unlock()
296 if _, ok := r.register[subId]; ok {
297 return r.register[subId]
302 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
304 defer r.mutex.Unlock()
305 for _, subId := range subIds {
306 if _, ok := r.register[subId]; ok {
307 return r.register[subId], nil
310 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)