2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 //-----------------------------------------------------------------------------
33 //-----------------------------------------------------------------------------
35 type Registry struct {
37 register map[uint32]*Subscription
39 rtmgrClient *RtmgrClient
42 func (r *Registry) Initialize() {
43 r.register = make(map[uint32]*Subscription)
45 for i = 1; i < 65535; i++ {
46 r.subIds = append(r.subIds, i)
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52 defer r.mutex.Unlock()
54 resp := models.SubscriptionList{}
55 for _, subs := range r.register {
57 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
64 if len(r.subIds) > 0 {
66 r.subIds = r.subIds[1:]
67 if _, ok := r.register[subId]; ok == true {
68 r.subIds = append(r.subIds, subId)
69 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71 subs := &Subscription{
80 DoNotWaitSubResp: false,
83 subs.ReqId.InstanceId = subId
84 if resetTestFlag == true {
85 subs.DoNotWaitSubResp = true
88 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
89 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
90 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
94 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
97 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
99 for _, subs := range r.register {
100 if subs.IsMergeable(trans, subReqMsg) {
103 // check if there has been race conditions
106 //subs has been set to invalid
107 if subs.valid == false {
111 // If size is zero, entry is to be deleted
112 if subs.EpList.Size() == 0 {
116 // Try to add to endpointlist. Adding fails if endpoint is already in the list
117 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
119 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
124 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
131 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) {
135 defer r.mutex.Unlock()
138 // Check validity of subscription action types
140 actionType, err := r.CheckActionTypes(subReqMsg)
142 xapp.Logger.Debug("CREATE %s", err)
147 // Find possible existing Policy subscription
149 if actionType == e2ap.E2AP_ActionTypePolicy {
150 if subs, ok := r.register[trans.GetSubId()]; ok {
151 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
152 // Update message data to subscription
153 subs.SubReqMsg = subReqMsg
154 subs.SetCachedResponse(nil, true)
159 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
161 if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil {
165 } else if endPointFound == true {
166 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
167 subs.RetryFromXapp = true
168 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
173 // Add to subscription
176 defer subs.mutex.Unlock()
178 epamount := subs.EpList.Size()
179 xapp.Logger.Info("AssignToSubscription subs.EpList.Size() = %v", subs.EpList.Size())
183 // Subscription route updates
186 err = r.RouteCreate(subs, c)
188 err = r.RouteCreateUpdate(subs, c)
194 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
196 // Delete already added endpoint for the request
197 subs.EpList.DelEndpoint(trans.GetEndpoint())
202 r.register[subs.ReqId.InstanceId] = subs
204 xapp.Logger.Debug("CREATE %s", subs.String())
205 xapp.Logger.Debug("Registry: substable=%v", r.register)
209 func (r *Registry) RouteCreate(subs *Subscription, c *Control) error {
210 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
211 err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
213 c.UpdateCounter(cRouteCreateFail)
218 func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error {
219 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
220 err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
222 c.UpdateCounter(cRouteCreateUpdateFail)
225 c.UpdateCounter(cMergedSubscriptions)
229 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
230 var reportFound bool = false
231 var policyFound bool = false
232 var insertFound bool = false
234 for _, acts := range subReqMsg.ActionSetups {
235 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
238 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
241 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
245 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
246 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
248 if reportFound == true {
249 return e2ap.E2AP_ActionTypeReport, nil
251 if policyFound == true {
252 return e2ap.E2AP_ActionTypePolicy, nil
254 if insertFound == true {
255 return e2ap.E2AP_ActionTypeInsert, nil
257 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
260 // TODO: Works with concurrent calls, but check if can be improved
261 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
264 defer r.mutex.Unlock()
266 defer subs.mutex.Unlock()
268 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
269 epamount := subs.EpList.Size()
270 subId := subs.ReqId.InstanceId
272 if delStatus == false {
277 if waitRouteClean > 0 {
278 time.Sleep(waitRouteClean)
282 defer subs.mutex.Unlock()
283 xapp.Logger.Info("CLEAN %s", subs.String())
287 // Subscription route delete
289 r.RouteDelete(subs, trans, c)
292 // Subscription release
295 defer r.mutex.Unlock()
297 if _, ok := r.register[subId]; ok {
298 xapp.Logger.Debug("RELEASE %s", subs.String())
299 delete(r.register, subId)
300 xapp.Logger.Debug("Registry: substable=%v", r.register)
302 r.subIds = append(r.subIds, subId)
303 } else if subs.EpList.Size() > 0 {
305 // Subscription route update
307 r.RouteDeleteUpdate(subs, c)
314 func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) {
315 tmpList := xapp.RmrEndpointList{}
316 tmpList.AddEndpoint(trans.GetEndpoint())
317 subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)}
318 if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil {
319 c.UpdateCounter(cRouteDeleteFail)
323 func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) {
324 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
325 if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil {
326 c.UpdateCounter(cRouteDeleteUpdateFail)
330 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
332 defer r.mutex.Unlock()
334 defer subs.mutex.Unlock()
336 epamount := subs.EpList.Size()
338 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
339 // Not merged subscription is being deleted
340 c.RemoveSubscriptionFromDb(subs)
343 } else if subs.EpList.Size() > 0 {
344 // Endpoint of merged subscription is being deleted
345 c.WriteSubscriptionToDb(subs)
346 c.UpdateCounter(cUnmergedSubscriptions)
350 func (r *Registry) GetSubscription(subId uint32) *Subscription {
352 defer r.mutex.Unlock()
353 if _, ok := r.register[subId]; ok {
354 return r.register[subId]
359 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
361 defer r.mutex.Unlock()
362 for _, subId := range subIds {
363 if _, ok := r.register[subId]; ok {
364 return r.register[subId], nil
367 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)