2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 //-----------------------------------------------------------------------------
35 //-----------------------------------------------------------------------------
37 type RESTSubscription struct {
38 xAppRmrEndPoint string
41 xAppIdToE2Id map[int64]int64
47 func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
49 for _, v := range r.InstanceIds {
56 r.InstanceIds = append(r.InstanceIds, instanceId)
59 func (r *RESTSubscription) AddMd5Sum(md5sum string) {
61 r.lastReqMd5sum = md5sum
63 xapp.Logger.Error("EMPTY md5sum attempted to be add to subscrition")
67 func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
68 r.InstanceIds = r.InstanceIds[1:]
71 func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
72 r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
75 func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
76 return r.xAppIdToE2Id[xAppEventInstanceID]
79 func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
80 delete(r.xAppIdToE2Id, xAppEventInstanceID)
83 func (r *RESTSubscription) SetProcessed(err error) {
84 r.SubReqOngoing = false
90 type Registry struct {
92 register map[uint32]*Subscription
94 rtmgrClient *RtmgrClient
95 restSubscriptions map[string]*RESTSubscription
98 func (r *Registry) Initialize() {
99 r.register = make(map[uint32]*Subscription)
100 r.restSubscriptions = make(map[string]*RESTSubscription)
103 for i = 1; i < 65535; i++ {
104 r.subIds = append(r.subIds, i)
108 func (r *Registry) GetAllRestSubscriptions() []byte {
110 defer r.mutex.Unlock()
111 restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
113 xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
115 return restSubscriptionsJson
118 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
120 defer r.mutex.Unlock()
121 newRestSubscription := RESTSubscription{}
122 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
123 newRestSubscription.Meid = *maid
124 newRestSubscription.SubReqOngoing = true
125 newRestSubscription.SubDelReqOngoing = false
126 r.restSubscriptions[*restSubId] = &newRestSubscription
127 newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
128 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
129 return &newRestSubscription, nil
132 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
134 defer r.mutex.Unlock()
135 delete(r.restSubscriptions, *restSubId)
136 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
139 func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
141 defer r.mutex.Unlock()
142 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
143 // Subscription deletion is not allowed if prosessing subscription request in not ready
144 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
145 if IsDelReqOngoing == true {
146 restSubscription.SubDelReqOngoing = true
148 r.restSubscriptions[restSubId] = restSubscription
149 return restSubscription, nil
151 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
153 return restSubscription, nil
155 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
158 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
160 defer r.mutex.Unlock()
162 resp := models.SubscriptionList{}
163 for _, subs := range r.register {
165 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
171 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
172 if len(r.subIds) > 0 {
174 r.subIds = r.subIds[1:]
175 if _, ok := r.register[subId]; ok == true {
176 r.subIds = append(r.subIds, subId)
177 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
179 subs := &Subscription{
182 SubReqMsg: subReqMsg,
184 RetryFromXapp: false,
188 DoNotWaitSubResp: false,
190 subs.ReqId.Id = subReqMsg.RequestId.Id
191 subs.ReqId.InstanceId = subId
192 if resetTestFlag == true {
193 subs.DoNotWaitSubResp = true
196 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
197 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
198 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
202 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
205 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
207 for _, subs := range r.register {
208 if subs.IsMergeable(trans, subReqMsg) {
211 // check if there has been race conditions
214 //subs has been set to invalid
215 if subs.valid == false {
219 // If size is zero, entry is to be deleted
220 if subs.EpList.Size() == 0 {
224 // Try to add to endpointlist. Adding fails if endpoint is already in the list
225 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
227 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
232 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
239 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
243 defer r.mutex.Unlock()
246 // Check validity of subscription action types
248 actionType, err := r.CheckActionTypes(subReqMsg)
250 xapp.Logger.Info("CREATE %s", err)
251 err = fmt.Errorf("E2 content validation failed")
256 // Find possible existing Policy subscription
258 if actionType == e2ap.E2AP_ActionTypePolicy {
259 if subs, ok := r.register[trans.GetSubId()]; ok {
260 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
261 // Update message data to subscription
262 subs.SubReqMsg = subReqMsg
263 subs.SetCachedResponse(nil, true)
268 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
270 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
271 xapp.Logger.Error("%s", err.Error())
272 err = fmt.Errorf("subscription not allocated")
276 } else if endPointFound == true {
277 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
278 subs.RetryFromXapp = true
279 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
280 c.UpdateCounter(cDuplicateE2SubReq)
285 // Add to subscription
288 defer subs.mutex.Unlock()
290 epamount := subs.EpList.Size()
291 xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
295 // Subscription route updates
298 err = r.RouteCreate(subs, c)
300 err = r.RouteCreateUpdate(subs, c)
306 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
308 // Delete already added endpoint for the request
309 subs.EpList.DelEndpoint(trans.GetEndpoint())
314 r.register[subs.ReqId.InstanceId] = subs
316 xapp.Logger.Debug("CREATE %s", subs.String())
317 xapp.Logger.Debug("Registry: substable=%v", r.register)
321 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
322 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
323 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
325 c.UpdateCounter(cRouteCreateFail)
326 xapp.Logger.Error("%s", err.Error())
327 err = fmt.Errorf("RTMGR route create failure")
332 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
333 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
334 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
336 c.UpdateCounter(cRouteCreateUpdateFail)
337 xapp.Logger.Error("%s", err.Error())
338 err = fmt.Errorf("RTMGR route update failure")
341 c.UpdateCounter(cMergedSubscriptions)
345 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
346 var reportFound bool = false
347 var policyFound bool = false
348 var insertFound bool = false
350 for _, acts := range subReqMsg.ActionSetups {
351 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
354 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
357 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
361 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
362 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
364 if reportFound == true {
365 return e2ap.E2AP_ActionTypeReport, nil
367 if policyFound == true {
368 return e2ap.E2AP_ActionTypePolicy, nil
370 if insertFound == true {
371 return e2ap.E2AP_ActionTypeInsert, nil
373 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
376 // TODO: Works with concurrent calls, but check if can be improved
377 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
380 defer r.mutex.Unlock()
382 defer subs.mutex.Unlock()
384 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
385 epamount := subs.EpList.Size()
386 subId := subs.ReqId.InstanceId
388 if delStatus == false {
393 if waitRouteClean > 0 {
394 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
395 time.Sleep(waitRouteClean)
399 defer subs.mutex.Unlock()
400 xapp.Logger.Info("CLEAN %s", subs.String())
404 // Subscription route delete
406 r.RouteDelete(subs, trans, c)
409 // Subscription release
412 defer r.mutex.Unlock()
414 if _, ok := r.register[subId]; ok {
415 xapp.Logger.Debug("RELEASE %s", subs.String())
416 delete(r.register, subId)
417 xapp.Logger.Debug("Registry: substable=%v", r.register)
419 r.subIds = append(r.subIds, subId)
420 } else if subs.EpList.Size() > 0 {
422 // Subscription route update
424 r.RouteDeleteUpdate(subs, c)
431 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
432 tmpList := xapp.RmrEndpointList{}
433 tmpList.AddEndpoint(trans.GetEndpoint())
434 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
435 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
436 c.UpdateCounter(cRouteDeleteFail)
440 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
441 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
442 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
443 c.UpdateCounter(cRouteDeleteUpdateFail)
447 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
449 defer r.mutex.Unlock()
451 defer subs.mutex.Unlock()
453 epamount := subs.EpList.Size()
455 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
456 // Not merged subscription is being deleted
457 c.RemoveSubscriptionFromDb(subs)
460 } else if subs.EpList.Size() > 0 {
461 // Endpoint of merged subscription is being deleted
462 c.WriteSubscriptionToDb(subs)
463 c.UpdateCounter(cUnmergedSubscriptions)
467 func (r *Registry) GetSubscription(subId uint32) *Subscription {
469 defer r.mutex.Unlock()
470 if _, ok := r.register[subId]; ok {
471 return r.register[subId]
476 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
478 defer r.mutex.Unlock()
479 for _, subId := range subIds {
480 if _, ok := r.register[subId]; ok {
481 return r.register[subId], nil
484 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)