2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type RESTSubscription struct {
37 xAppRmrEndPoint string
40 xAppIdToE2Id map[int64]int64
45 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
46 r.InstanceIds = append(r.InstanceIds, instanceId)
49 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
50 r.InstanceIds = r.InstanceIds[1:]
53 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
54 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
57 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
58 return r.xAppIdToE2Id[xAppEventInstanceID]
61 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
62 delete(r.xAppIdToE2Id, xAppEventInstanceID)
65 func (r *RESTSubscription) SetProcessed() {
66 r.SubReqOngoing = false
69 type Registry struct {
71 register map[uint32]*Subscription
73 rtmgrClient *RtmgrClient
74 restSubscriptions map[string]*RESTSubscription
77 func (r *Registry) Initialize() {
78 r.register = make(map[uint32]*Subscription)
79 r.restSubscriptions = make(map[string]*RESTSubscription)
82 for i = 1; i < 65535; i++ {
83 r.subIds = append(r.subIds, i)
87 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
89 defer r.mutex.Unlock()
90 newRestSubscription := RESTSubscription{}
91 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
92 newRestSubscription.Meid = *maid
93 newRestSubscription.SubReqOngoing = true
94 newRestSubscription.SubDelReqOngoing = false
95 r.restSubscriptions[*restSubId] = &newRestSubscription
96 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
97 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
98 return &newRestSubscription, nil
101 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
103 defer r.mutex.Unlock()
104 delete(r.restSubscriptions, *restSubId)
105 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
108 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
110 defer r.mutex.Unlock()
111 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
112 // Subscription deletion is not allowed if prosessing subscription request in not ready
113 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
114 if IsDelReqOngoing == true {
115 restSubscription.SubDelReqOngoing = true
117 r.restSubscriptions[restSubId] = restSubscription
118 return restSubscription, nil
120 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
122 return restSubscription, nil
124 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
127 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
129 defer r.mutex.Unlock()
131 resp := models.SubscriptionList{}
132 for _, subs := range r.register {
134 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
140 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
141 if len(r.subIds) > 0 {
143 r.subIds = r.subIds[1:]
144 if _, ok := r.register[subId]; ok == true {
145 r.subIds = append(r.subIds, subId)
146 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
148 subs := &Subscription{
151 SubReqMsg: subReqMsg,
153 RetryFromXapp: false,
157 DoNotWaitSubResp: false,
159 subs.ReqId.Id = subReqMsg.RequestId.Id
160 subs.ReqId.InstanceId = subId
161 if resetTestFlag == true {
162 subs.DoNotWaitSubResp = true
165 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
166 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
167 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
171 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
174 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
176 for _, subs := range r.register {
177 if subs.IsMergeable(trans, subReqMsg) {
180 // check if there has been race conditions
183 //subs has been set to invalid
184 if subs.valid == false {
188 // If size is zero, entry is to be deleted
189 if subs.EpList.Size() == 0 {
193 // Try to add to endpointlist. Adding fails if endpoint is already in the list
194 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
196 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
201 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
208 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
212 defer r.mutex.Unlock()
215 // Check validity of subscription action types
217 actionType, err := r.CheckActionTypes(subReqMsg)
219 xapp.Logger.Info("CREATE %s", err)
220 err = fmt.Errorf("E2 content validation failed")
225 // Find possible existing Policy subscription
227 if actionType == e2ap.E2AP_ActionTypePolicy {
228 if subs, ok := r.register[trans.GetSubId()]; ok {
229 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
230 // Update message data to subscription
231 subs.SubReqMsg = subReqMsg
232 subs.SetCachedResponse(nil, true)
237 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
239 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
240 xapp.Logger.Error("%s", err.Error())
241 err = fmt.Errorf("subscription not allocated")
245 } else if endPointFound == true {
246 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
247 subs.RetryFromXapp = true
248 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
249 c.UpdateCounter(cDuplicateE2SubReq)
254 // Add to subscription
257 defer subs.mutex.Unlock()
259 epamount := subs.EpList.Size()
260 xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
264 // Subscription route updates
267 err = r.RouteCreate(subs, c)
269 err = r.RouteCreateUpdate(subs, c)
275 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
277 // Delete already added endpoint for the request
278 subs.EpList.DelEndpoint(trans.GetEndpoint())
283 r.register[subs.ReqId.InstanceId] = subs
285 xapp.Logger.Debug("CREATE %s", subs.String())
286 xapp.Logger.Debug("Registry: substable=%v", r.register)
290 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
291 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
292 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
294 c.UpdateCounter(cRouteCreateFail)
295 xapp.Logger.Error("%s", err.Error())
296 err = fmt.Errorf("RTMGR route create failure")
301 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
302 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
303 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
305 c.UpdateCounter(cRouteCreateUpdateFail)
306 xapp.Logger.Error("%s", err.Error())
307 err = fmt.Errorf("RTMGR route update failure")
310 c.UpdateCounter(cMergedSubscriptions)
314 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
315 var reportFound bool = false
316 var policyFound bool = false
317 var insertFound bool = false
319 for _, acts := range subReqMsg.ActionSetups {
320 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
323 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
326 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
330 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
331 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
333 if reportFound == true {
334 return e2ap.E2AP_ActionTypeReport, nil
336 if policyFound == true {
337 return e2ap.E2AP_ActionTypePolicy, nil
339 if insertFound == true {
340 return e2ap.E2AP_ActionTypeInsert, nil
342 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
345 // TODO: Works with concurrent calls, but check if can be improved
346 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
349 defer r.mutex.Unlock()
351 defer subs.mutex.Unlock()
353 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
354 epamount := subs.EpList.Size()
355 subId := subs.ReqId.InstanceId
357 if delStatus == false {
362 if waitRouteClean > 0 {
363 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
364 time.Sleep(waitRouteClean)
368 defer subs.mutex.Unlock()
369 xapp.Logger.Info("CLEAN %s", subs.String())
373 // Subscription route delete
375 r.RouteDelete(subs, trans, c)
378 // Subscription release
381 defer r.mutex.Unlock()
383 if _, ok := r.register[subId]; ok {
384 xapp.Logger.Debug("RELEASE %s", subs.String())
385 delete(r.register, subId)
386 xapp.Logger.Debug("Registry: substable=%v", r.register)
388 r.subIds = append(r.subIds, subId)
389 } else if subs.EpList.Size() > 0 {
391 // Subscription route update
393 r.RouteDeleteUpdate(subs, c)
400 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
401 tmpList := xapp.RmrEndpointList{}
402 tmpList.AddEndpoint(trans.GetEndpoint())
403 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
404 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
405 c.UpdateCounter(cRouteDeleteFail)
409 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
410 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
411 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
412 c.UpdateCounter(cRouteDeleteUpdateFail)
416 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
418 defer r.mutex.Unlock()
420 defer subs.mutex.Unlock()
422 epamount := subs.EpList.Size()
424 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
425 // Not merged subscription is being deleted
426 c.RemoveSubscriptionFromDb(subs)
429 } else if subs.EpList.Size() > 0 {
430 // Endpoint of merged subscription is being deleted
431 c.WriteSubscriptionToDb(subs)
432 c.UpdateCounter(cUnmergedSubscriptions)
436 func (r *Registry) GetSubscription(subId uint32) *Subscription {
438 defer r.mutex.Unlock()
439 if _, ok := r.register[subId]; ok {
440 return r.register[subId]
445 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
447 defer r.mutex.Unlock()
448 for _, subId := range subIds {
449 if _, ok := r.register[subId]; ok {
450 return r.register[subId], nil
453 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)