2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
27 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
29 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
32 //-----------------------------------------------------------------------------
34 //-----------------------------------------------------------------------------
36 type RESTSubscription struct {
37 xAppRmrEndPoint string
44 func (r *RESTSubscription) AddInstanceId(instanceId uint32) {
45 r.InstanceIds = append(r.InstanceIds, instanceId)
48 func (r *RESTSubscription) SetProcessed() {
49 r.SubReqOngoing = false
52 func (r *RESTSubscription) DeleteInstanceId(instanceId uint32) {
53 r.InstanceIds = r.InstanceIds[1:]
56 type Registry struct {
58 register map[uint32]*Subscription
60 rtmgrClient *RtmgrClient
61 restSubscriptions map[string]*RESTSubscription
64 func (r *Registry) Initialize() {
65 r.register = make(map[uint32]*Subscription)
66 r.restSubscriptions = make(map[string]*RESTSubscription)
69 for i = 1; i < 65535; i++ {
70 r.subIds = append(r.subIds, i)
74 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
76 defer r.mutex.Unlock()
77 newRestSubscription := RESTSubscription{}
78 newRestSubscription.xAppRmrEndPoint = *xAppRmrEndPoint
79 newRestSubscription.Meid = *maid
80 newRestSubscription.SubReqOngoing = true
81 newRestSubscription.SubDelReqOngoing = false
82 r.restSubscriptions[*restSubId] = &newRestSubscription
83 xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
84 return &newRestSubscription, nil
87 func (r *Registry) DeleteRESTSubscription(restSubId *string) {
89 defer r.mutex.Unlock()
90 delete(r.restSubscriptions, *restSubId)
91 xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
94 func (r *Registry) GetRESTSubscription(restSubId string) (*RESTSubscription, error) {
96 defer r.mutex.Unlock()
97 if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
98 // Subscription deletion is not allowed if prosessing subscription request in not ready
99 if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
100 restSubscription.SubDelReqOngoing = true
101 r.restSubscriptions[restSubId] = restSubscription
102 return restSubscription, nil
104 return restSubscription, fmt.Errorf("Registry: REST request is still ongoing for the endpoint=%v, restSubId=%v, SubDelReqOngoing=%v, SubReqOngoing=%v", restSubscription, restSubId, restSubscription.SubDelReqOngoing, restSubscription.SubReqOngoing)
106 return restSubscription, nil
108 return nil, fmt.Errorf("Registry: No valid subscription found with restSubId=%v", restSubId)
111 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
113 defer r.mutex.Unlock()
115 resp := models.SubscriptionList{}
116 for _, subs := range r.register {
118 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, ClientEndpoint: subs.EpList.StringList()})
124 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
125 if len(r.subIds) > 0 {
127 r.subIds = r.subIds[1:]
128 if _, ok := r.register[subId]; ok == true {
129 r.subIds = append(r.subIds, subId)
130 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
132 subs := &Subscription{
135 SubReqMsg: subReqMsg,
137 RetryFromXapp: false,
141 DoNotWaitSubResp: false,
144 subs.ReqId.InstanceId = subId
145 if resetTestFlag == true {
146 subs.DoNotWaitSubResp = true
149 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
150 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
151 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
155 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
158 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
160 for _, subs := range r.register {
161 if subs.IsMergeable(trans, subReqMsg) {
164 // check if there has been race conditions
167 //subs has been set to invalid
168 if subs.valid == false {
172 // If size is zero, entry is to be deleted
173 if subs.EpList.Size() == 0 {
177 // Try to add to endpointlist. Adding fails if endpoint is already in the list
178 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
180 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
185 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
192 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
196 defer r.mutex.Unlock()
199 // Check validity of subscription action types
201 actionType, err := r.CheckActionTypes(subReqMsg)
203 xapp.Logger.Debug("CREATE %s", err)
208 // Find possible existing Policy subscription
210 if actionType == e2ap.E2AP_ActionTypePolicy {
211 if subs, ok := r.register[trans.GetSubId()]; ok {
212 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
213 // Update message data to subscription
214 subs.SubReqMsg = subReqMsg
215 subs.SetCachedResponse(nil, true)
220 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
222 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
226 } else if endPointFound == true {
227 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
228 subs.RetryFromXapp = true
229 xapp.Logger.Debug("CREATE subReqMsg.InstanceId=%v. Same subscription %s already exists.", subReqMsg.InstanceId, subs.String())
230 c.UpdateCounter(cDuplicateE2SubReq)
235 // Add to subscription
238 defer subs.mutex.Unlock()
240 epamount := subs.EpList.Size()
241 xapp.Logger.Info("AssignToSubscription subs.EpList.Size()=%v", subs.EpList.Size())
245 // Subscription route updates
248 err = r.RouteCreate(subs, c)
250 err = r.RouteCreateUpdate(subs, c)
256 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
258 // Delete already added endpoint for the request
259 subs.EpList.DelEndpoint(trans.GetEndpoint())
264 r.register[subs.ReqId.InstanceId] = subs
266 xapp.Logger.Debug("CREATE %s", subs.String())
267 xapp.Logger.Debug("Registry: substable=%v", r.register)
271 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
272 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
273 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
275 c.UpdateCounter(cRouteCreateFail)
280 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
281 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
282 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
284 c.UpdateCounter(cRouteCreateUpdateFail)
287 c.UpdateCounter(cMergedSubscriptions)
291 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
292 var reportFound bool = false
293 var policyFound bool = false
294 var insertFound bool = false
296 for _, acts := range subReqMsg.ActionSetups {
297 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
300 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
303 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
307 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
308 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
310 if reportFound == true {
311 return e2ap.E2AP_ActionTypeReport, nil
313 if policyFound == true {
314 return e2ap.E2AP_ActionTypePolicy, nil
316 if insertFound == true {
317 return e2ap.E2AP_ActionTypeInsert, nil
319 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
322 // TODO: Works with concurrent calls, but check if can be improved
323 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
326 defer r.mutex.Unlock()
328 defer subs.mutex.Unlock()
330 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
331 epamount := subs.EpList.Size()
332 subId := subs.ReqId.InstanceId
334 if delStatus == false {
339 if waitRouteClean > 0 {
340 xapp.Logger.Debug("Pending %v in order to wait route cleanup", waitRouteClean)
341 time.Sleep(waitRouteClean)
345 defer subs.mutex.Unlock()
346 xapp.Logger.Info("CLEAN %s", subs.String())
350 // Subscription route delete
352 r.RouteDelete(subs, trans, c)
355 // Subscription release
358 defer r.mutex.Unlock()
360 if _, ok := r.register[subId]; ok {
361 xapp.Logger.Debug("RELEASE %s", subs.String())
362 delete(r.register, subId)
363 xapp.Logger.Debug("Registry: substable=%v", r.register)
365 r.subIds = append(r.subIds, subId)
366 } else if subs.EpList.Size() > 0 {
368 // Subscription route update
370 r.RouteDeleteUpdate(subs, c)
377 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
378 tmpList := xapp.RmrEndpointList{}
379 tmpList.AddEndpoint(trans.GetEndpoint())
380 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
381 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
382 c.UpdateCounter(cRouteDeleteFail)
386 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
387 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
388 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
389 c.UpdateCounter(cRouteDeleteUpdateFail)
393 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
395 defer r.mutex.Unlock()
397 defer subs.mutex.Unlock()
399 epamount := subs.EpList.Size()
401 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
402 // Not merged subscription is being deleted
403 c.RemoveSubscriptionFromDb(subs)
406 } else if subs.EpList.Size() > 0 {
407 // Endpoint of merged subscription is being deleted
408 c.WriteSubscriptionToDb(subs)
409 c.UpdateCounter(cUnmergedSubscriptions)
413 func (r *Registry) GetSubscription(subId uint32) *Subscription {
415 defer r.mutex.Unlock()
416 if _, ok := r.register[subId]; ok {
417 return r.register[subId]
422 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
424 defer r.mutex.Unlock()
425 for _, subId := range subIds {
426 if _, ok := r.register[subId]; ok {
427 return r.register[subId], nil
430 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)