2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 //-----------------------------------------------------------------------------
33 //-----------------------------------------------------------------------------
35 type Registry struct {
37 register map[uint32]*Subscription
39 rtmgrClient *RtmgrClient
42 func (r *Registry) Initialize() {
43 r.register = make(map[uint32]*Subscription)
45 for i = 1; i < 65535; i++ {
46 r.subIds = append(r.subIds, i)
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52 defer r.mutex.Unlock()
54 resp := models.SubscriptionList{}
55 for _, subs := range r.register {
57 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
64 if len(r.subIds) > 0 {
66 r.subIds = r.subIds[1:]
67 if _, ok := r.register[subId]; ok == true {
68 r.subIds = append(r.subIds, subId)
69 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71 subs := &Subscription{
80 DoNotWaitSubResp: false,
83 subs.ReqId.InstanceId = subId
84 if resetTestFlag == true {
85 subs.DoNotWaitSubResp = true
88 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
89 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
90 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
94 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
97 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, bool) {
99 for _, subs := range r.register {
100 if subs.IsMergeable(trans, subReqMsg) {
103 // check if there has been race conditions
106 //subs has been set to invalid
107 if subs.valid == false {
111 // If size is zero, entry is to be deleted
112 if subs.EpList.Size() == 0 {
116 // Try to add to endpointlist. Adding fails if endpoint is already in the list
117 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
119 xapp.Logger.Debug("Registry: Subs with requesting endpoint found. %s for %s", subs.String(), trans.String())
124 xapp.Logger.Debug("Registry: Mergeable subs found. %s for %s", subs.String(), trans.String())
131 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
135 defer r.mutex.Unlock()
138 // Check validity of subscription action types
140 actionType, err := r.CheckActionTypes(subReqMsg)
142 xapp.Logger.Debug("CREATE %s", err)
147 // Find possible existing Policy subscription
149 if actionType == e2ap.E2AP_ActionTypePolicy {
150 if subs, ok := r.register[trans.GetSubId()]; ok {
151 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found.", subs.String())
152 // Update message data to subscription
153 subs.SubReqMsg = subReqMsg
154 subs.SetCachedResponse(nil, true)
159 subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
161 subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag)
166 } else if endPointFound == true {
167 // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
168 subs.RetryFromXapp = true
169 xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
170 //xapp.Logger.Debug("Registry: substable=%v", r.register)
175 // Add to subscription
178 defer subs.mutex.Unlock()
180 epamount := subs.EpList.Size()
184 // Subscription route updates
187 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
188 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
190 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
191 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
197 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
203 r.register[subs.ReqId.InstanceId] = subs
205 xapp.Logger.Debug("CREATE %s", subs.String())
206 xapp.Logger.Debug("Registry: substable=%v", r.register)
210 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
211 var reportFound bool = false
212 var policyFound bool = false
213 var insertFound bool = false
215 for _, acts := range subReqMsg.ActionSetups {
216 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
219 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
222 if acts.ActionType == e2ap.E2AP_ActionTypeInsert {
226 if reportFound == true && policyFound == true || reportFound == true && insertFound == true || policyFound == true && insertFound == true {
227 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Different action types (Report, Policy or Insert) in same RICactions-ToBeSetup-List")
229 if reportFound == true {
230 return e2ap.E2AP_ActionTypeReport, nil
232 if policyFound == true {
233 return e2ap.E2AP_ActionTypePolicy, nil
235 if insertFound == true {
236 return e2ap.E2AP_ActionTypeInsert, nil
238 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
241 // TODO: Works with concurrent calls, but check if can be improved
242 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
245 defer r.mutex.Unlock()
247 defer subs.mutex.Unlock()
249 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
250 epamount := subs.EpList.Size()
251 subId := subs.ReqId.InstanceId
253 if delStatus == false {
258 if waitRouteClean > 0 {
259 time.Sleep(waitRouteClean)
263 defer subs.mutex.Unlock()
264 xapp.Logger.Info("CLEAN %s", subs.String())
268 // Subscription route delete
270 tmpList := xapp.RmrEndpointList{}
271 tmpList.AddEndpoint(trans.GetEndpoint())
272 subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
273 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
276 // Subscription release
279 defer r.mutex.Unlock()
281 if _, ok := r.register[subId]; ok {
282 xapp.Logger.Debug("RELEASE %s", subs.String())
283 delete(r.register, subId)
284 xapp.Logger.Debug("Registry: substable=%v", r.register)
286 r.subIds = append(r.subIds, subId)
287 } else if subs.EpList.Size() > 0 {
289 // Subscription route updates
291 subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
292 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
299 func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
301 defer r.mutex.Unlock()
303 defer subs.mutex.Unlock()
305 epamount := subs.EpList.Size()
307 if _, ok := r.register[subs.ReqId.InstanceId]; ok {
308 // Not merged subscription is being deleted
309 c.RemoveSubscriptionFromDb(subs)
312 } else if subs.EpList.Size() > 0 {
313 // Endpoint of merged subscription is being deleted
314 c.WriteSubscriptionToDb(subs)
318 func (r *Registry) GetSubscription(subId uint32) *Subscription {
320 defer r.mutex.Unlock()
321 if _, ok := r.register[subId]; ok {
322 return r.register[subId]
327 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
329 defer r.mutex.Unlock()
330 for _, subId := range subIds {
331 if _, ok := r.register[subId]; ok {
332 return r.register[subId], nil
335 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)