2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type RESTSubscription struct {
37 xAppRmrEndPoint string
40 xAppIdToE2Id map[int64]int64
46 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
47 r.InstanceIds = append(r.InstanceIds, instanceId)
50 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
51 r.InstanceIds = r.InstanceIds[1:]
54 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
55 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
58 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
59 return r.xAppIdToE2Id[xAppEventInstanceID]
62 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
63 delete(r.xAppIdToE2Id, xAppEventInstanceID)
66 func (r *RESTSubscription) SetProcessed() {
67 r.SubReqOngoing = false
71 type Registry struct {
73 register map[uint32]*Subscription
75 rtmgrClient *RtmgrClient
76 restSubscriptions map[string]*RESTSubscription
79 func (r *Registry) Initialize() {
80 r.register = make(map[uint32]*Subscription)
81 r.restSubscriptions = make(map[string]*RESTSubscription)
84 for i = 1; i < 65535; i++ {
85 r.subIds = append(r.subIds, i)
89 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
91 defer r.mutex.Unlock()
92 newRestSubscription := RESTSubscription{}
93 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
94 newRestSubscription.Meid = *maid
95 newRestSubscription.SubReqOngoing = true
96 newRestSubscription.SubDelReqOngoing = false
97 r.restSubscriptions[*restSubId] = &newRestSubscription
98 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
99 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
100 return &newRestSubscription, nil
103 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
105 defer r.mutex.Unlock()
106 delete(r.restSubscriptions, *restSubId)
107 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
110 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
112 defer r.mutex.Unlock()
113 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
114 // Subscription deletion is not allowed if prosessing subscription request in not ready
115 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
116 if IsDelReqOngoing == true {
117 restSubscription.SubDelReqOngoing = true
119 r.restSubscriptions[restSubId] = restSubscription
120 return restSubscription, nil
122 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
124 return restSubscription, nil
126 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
129 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
131 defer r.mutex.Unlock()
133 resp := models.SubscriptionList{}
134 for _, subs := range r.register {
136 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
142 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
143 if len(r.subIds) > 0 {
145 r.subIds = r.subIds[1:]
146 if _, ok := r.register[subId]; ok == true {
147 r.subIds = append(r.subIds, subId)
148 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
150 subs := &Subscription{
153 SubReqMsg: subReqMsg,
155 RetryFromXapp: false,
159 DoNotWaitSubResp: false,
161 subs.ReqId.Id = subReqMsg.RequestId.Id
162 subs.ReqId.InstanceId = subId
163 if resetTestFlag == true {
164 subs.DoNotWaitSubResp = true
167 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
168 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
169 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
173 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
176 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
178 for _, subs := range r.register {
179 if subs.IsMergeable(trans, subReqMsg) {
182 // check if there has been race conditions
185 //subs has been set to invalid
186 if subs.valid == false {
190 // If size is zero, entry is to be deleted
191 if subs.EpList.Size() == 0 {
195 // Try to add to endpointlist. Adding fails if endpoint is already in the list
196 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
198 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
203 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
210 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
214 defer r.mutex.Unlock()
217 // Check validity of subscription action types
219 actionType, err := r.CheckActionTypes(subReqMsg)
221 xapp.Logger.Info("CREATE %s", err)
222 err = fmt.Errorf("E2 content validation failed")
227 // Find possible existing Policy subscription
229 if actionType == e2ap.E2AP_ActionTypePolicy {
230 if subs, ok := r.register[trans.GetSubId()]; ok {
231 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
232 // Update message data to subscription
233 subs.SubReqMsg = subReqMsg
234 subs.SetCachedResponse(nil, true)
239 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
241 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
242 xapp.Logger.Error("%s", err.Error())
243 err = fmt.Errorf("subscription not allocated")
247 } else if endPointFound == true {
248 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
249 subs.RetryFromXapp = true
250 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
251 c.UpdateCounter(cDuplicateE2SubReq)
256 // Add to subscription
259 defer subs.mutex.Unlock()
261 epamount := subs.EpList.Size()
262 xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
266 // Subscription route updates
269 err = r.RouteCreate(subs, c)
271 err = r.RouteCreateUpdate(subs, c)
277 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
279 // Delete already added endpoint for the request
280 subs.EpList.DelEndpoint(trans.GetEndpoint())
285 r.register[subs.ReqId.InstanceId] = subs
287 xapp.Logger.Debug("CREATE %s", subs.String())
288 xapp.Logger.Debug("Registry: substable=%v", r.register)
292 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
293 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
294 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
296 c.UpdateCounter(cRouteCreateFail)
297 xapp.Logger.Error("%s", err.Error())
298 err = fmt.Errorf("RTMGR route create failure")
303 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
304 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
305 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
307 c.UpdateCounter(cRouteCreateUpdateFail)
308 xapp.Logger.Error("%s", err.Error())
309 err = fmt.Errorf("RTMGR route update failure")
312 c.UpdateCounter(cMergedSubscriptions)
316 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
317 var reportFound bool = false
318 var policyFound bool = false
319 var insertFound bool = false
321 for _, acts := range subReqMsg.ActionSetups {
322 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
325 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
328 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
332 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
333 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
335 if reportFound == true {
336 return e2ap.E2AP_ActionTypeReport, nil
338 if policyFound == true {
339 return e2ap.E2AP_ActionTypePolicy, nil
341 if insertFound == true {
342 return e2ap.E2AP_ActionTypeInsert, nil
344 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
347 // TODO: Works with concurrent calls, but check if can be improved
348 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
351 defer r.mutex.Unlock()
353 defer subs.mutex.Unlock()
355 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
356 epamount := subs.EpList.Size()
357 subId := subs.ReqId.InstanceId
359 if delStatus == false {
364 if waitRouteClean > 0 {
365 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
366 time.Sleep(waitRouteClean)
370 defer subs.mutex.Unlock()
371 xapp.Logger.Info("CLEAN %s", subs.String())
375 // Subscription route delete
377 r.RouteDelete(subs, trans, c)
380 // Subscription release
383 defer r.mutex.Unlock()
385 if _, ok := r.register[subId]; ok {
386 xapp.Logger.Debug("RELEASE %s", subs.String())
387 delete(r.register, subId)
388 xapp.Logger.Debug("Registry: substable=%v", r.register)
390 r.subIds = append(r.subIds, subId)
391 } else if subs.EpList.Size() > 0 {
393 // Subscription route update
395 r.RouteDeleteUpdate(subs, c)
402 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
403 tmpList := xapp.RmrEndpointList{}
404 tmpList.AddEndpoint(trans.GetEndpoint())
405 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
406 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
407 c.UpdateCounter(cRouteDeleteFail)
411 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
412 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
413 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
414 c.UpdateCounter(cRouteDeleteUpdateFail)
418 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
420 defer r.mutex.Unlock()
422 defer subs.mutex.Unlock()
424 epamount := subs.EpList.Size()
426 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
427 // Not merged subscription is being deleted
428 c.RemoveSubscriptionFromDb(subs)
431 } else if subs.EpList.Size() > 0 {
432 // Endpoint of merged subscription is being deleted
433 c.WriteSubscriptionToDb(subs)
434 c.UpdateCounter(cUnmergedSubscriptions)
438 func (r *Registry) GetSubscription(subId uint32) *Subscription {
440 defer r.mutex.Unlock()
441 if _, ok := r.register[subId]; ok {
442 return r.register[subId]
447 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
449 defer r.mutex.Unlock()
450 for _, subId := range subIds {
451 if _, ok := r.register[subId]; ok {
452 return r.register[subId], nil
455 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)