2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type RESTSubscription struct {
37 xAppRmrEndPoint string
44 func (r *RESTSubscription) AddInstanceId(instanceId uint32) {
45 r.InstanceIds = append(r.InstanceIds, instanceId)
48 func (r *RESTSubscription) SetProcessed() {
49 r.SubReqOngoing = false
52 func (r *RESTSubscription) DeleteInstanceId(instanceId uint32) {
53 r.InstanceIds = r.InstanceIds[1:]
56 type Registry struct {
58 register map[uint32]*Subscription
60 rtmgrClient *RtmgrClient
61 restSubscriptions map[string]*RESTSubscription
64 func (r *Registry) Initialize() {
65 r.register = make(map[uint32]*Subscription)
66 r.restSubscriptions = make(map[string]*RESTSubscription)
69 for i = 1; i < 65535; i++ {
70 r.subIds = append(r.subIds, i)
74 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
76 defer r.mutex.Unlock()
77 newRestSubscription := RESTSubscription{}
78 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
79 newRestSubscription.Meid = *maid
80 newRestSubscription.SubReqOngoing = true
81 newRestSubscription.SubDelReqOngoing = false
82 r.restSubscriptions[*restSubId] = &newRestSubscription
83 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apCubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
84 return &newRestSubscription, nil
87 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
89 defer r.mutex.Unlock()
90 delete(r.restSubscriptions, *restSubId)
91 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
94 func (r *Registry) GetRESTSubscription(restSubId string) (*RESTSubscription, error) {
96 defer r.mutex.Unlock()
97 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
98 // Subscription deletion is not allowed if prosessing subscription request in not ready
99 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
100 restSubscription.SubDelReqOngoing = true
101 r.restSubscriptions[restSubId] = restSubscription
102 return restSubscription, nil
104 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
106 return restSubscription, nil
108 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
111 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
113 defer r.mutex.Unlock()
115 resp := models.SubscriptionList{}
116 for _, subs := range r.register {
118 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
124 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
125 if len(r.subIds) > 0 {
127 r.subIds = r.subIds[1:]
128 if _, ok := r.register[subId]; ok == true {
129 r.subIds = append(r.subIds, subId)
130 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
132 subs := &Subscription{
135 SubReqMsg: subReqMsg,
137 RetryFromXapp: false,
141 DoNotWaitSubResp: false,
144 subs.ReqId.InstanceId = subId
145 if resetTestFlag == true {
146 subs.DoNotWaitSubResp = true
149 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
150 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
151 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
155 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
158 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
160 for _, subs := range r.register {
161 if subs.IsMergeable(trans, subReqMsg) {
164 // check if there has been race conditions
167 //subs has been set to invalid
168 if subs.valid == false {
172 // If size is zero, entry is to be deleted
173 if subs.EpList.Size() == 0 {
177 // Try to add to endpointlist. Adding fails if endpoint is already in the list
178 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
180 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
185 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
192 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
196 defer r.mutex.Unlock()
199 // Check validity of subscription action types
201 actionType, err := r.CheckActionTypes(subReqMsg)
203 xapp.Logger.Debug("CREATE %s", err)
208 // Find possible existing Policy subscription
210 if actionType == e2ap.E2AP_ActionTypePolicy {
211 if subs, ok := r.register[trans.GetSubId()]; ok {
212 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
213 // Update message data to subscription
214 subs.SubReqMsg = subReqMsg
215 subs.SetCachedResponse(nil, true)
220 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
222 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
226 } else if endPointFound == true {
227 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
228 subs.RetryFromXapp = true
229 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
234 // Add to subscription
237 defer subs.mutex.Unlock()
239 epamount := subs.EpList.Size()
240 xapp.Logger.Info("AssignToSubscription subs.EpList.Size() = %v", subs.EpList.Size())
244 // Subscription route updates
247 err = r.RouteCreate(subs, c)
249 err = r.RouteCreateUpdate(subs, c)
255 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
257 // Delete already added endpoint for the request
258 subs.EpList.DelEndpoint(trans.GetEndpoint())
263 r.register[subs.ReqId.InstanceId] = subs
265 xapp.Logger.Debug("CREATE %s", subs.String())
266 xapp.Logger.Debug("Registry: substable=%v", r.register)
270 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
271 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
272 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
274 c.UpdateCounter(cRouteCreateFail)
279 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
280 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
281 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
283 c.UpdateCounter(cRouteCreateUpdateFail)
286 c.UpdateCounter(cMergedSubscriptions)
290 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
291 var reportFound bool = false
292 var policyFound bool = false
293 var insertFound bool = false
295 for _, acts := range subReqMsg.ActionSetups {
296 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
299 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
302 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
306 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
307 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
309 if reportFound == true {
310 return e2ap.E2AP_ActionTypeReport, nil
312 if policyFound == true {
313 return e2ap.E2AP_ActionTypePolicy, nil
315 if insertFound == true {
316 return e2ap.E2AP_ActionTypeInsert, nil
318 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
321 // TODO: Works with concurrent calls, but check if can be improved
322 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
325 defer r.mutex.Unlock()
327 defer subs.mutex.Unlock()
329 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
330 epamount := subs.EpList.Size()
331 subId := subs.ReqId.InstanceId
333 if delStatus == false {
338 if waitRouteClean > 0 {
339 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
340 time.Sleep(waitRouteClean)
344 defer subs.mutex.Unlock()
345 xapp.Logger.Info("CLEAN %s", subs.String())
349 // Subscription route delete
351 r.RouteDelete(subs, trans, c)
354 // Subscription release
357 defer r.mutex.Unlock()
359 if _, ok := r.register[subId]; ok {
360 xapp.Logger.Debug("RELEASE %s", subs.String())
361 delete(r.register, subId)
362 xapp.Logger.Debug("Registry: substable=%v", r.register)
364 r.subIds = append(r.subIds, subId)
365 } else if subs.EpList.Size() > 0 {
367 // Subscription route update
369 r.RouteDeleteUpdate(subs, c)
376 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
377 tmpList := xapp.RmrEndpointList{}
378 tmpList.AddEndpoint(trans.GetEndpoint())
379 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
380 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
381 c.UpdateCounter(cRouteDeleteFail)
385 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
386 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
387 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
388 c.UpdateCounter(cRouteDeleteUpdateFail)
392 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
394 defer r.mutex.Unlock()
396 defer subs.mutex.Unlock()
398 epamount := subs.EpList.Size()
400 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
401 // Not merged subscription is being deleted
402 c.RemoveSubscriptionFromDb(subs)
405 } else if subs.EpList.Size() > 0 {
406 // Endpoint of merged subscription is being deleted
407 c.WriteSubscriptionToDb(subs)
408 c.UpdateCounter(cUnmergedSubscriptions)
412 func (r *Registry) GetSubscription(subId uint32) *Subscription {
414 defer r.mutex.Unlock()
415 if _, ok := r.register[subId]; ok {
416 return r.register[subId]
421 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
423 defer r.mutex.Unlock()
424 for _, subId := range subIds {
425 if _, ok := r.register[subId]; ok {
426 return r.register[subId], nil
429 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)