2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 //-----------------------------------------------------------------------------
33 //-----------------------------------------------------------------------------
35 type Registry struct {
37 register map[uint32]*Subscription
39 rtmgrClient *RtmgrClient
42 func (r *Registry) Initialize() {
43 r.register = make(map[uint32]*Subscription)
45 for i = 0; i < 65535; i++ {
46 r.subIds = append(r.subIds, i+1)
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52 defer r.mutex.Unlock()
54 resp := models.SubscriptionList{}
55 for _, subs := range r.register {
57 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.InstanceId), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64 if len(r.subIds) > 0 {
66 r.subIds = r.subIds[1:]
67 if _, ok := r.register[subId]; ok == true {
68 r.subIds = append(r.subIds, subId)
69 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71 subs := &Subscription{
78 subs.ReqId.InstanceId = subId
80 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
82 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
87 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
92 for _, subs := range r.register {
93 if subs.IsMergeable(trans, subReqMsg) {
96 // check if there has been race conditions
99 //subs has been set to invalid
100 if subs.valid == false {
104 // If size is zero, entry is to be deleted
105 if subs.EpList.Size() == 0 {
109 // try to add to endpointlist.
110 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
116 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
123 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
127 defer r.mutex.Unlock()
130 // Check validity of subscription action types
132 actionType, err := r.CheckActionTypes(subReqMsg)
134 xapp.Logger.Debug("CREATE %s", err)
139 // Find possible existing Policy subscription
141 if actionType == e2ap.E2AP_ActionTypePolicy {
142 if subs, ok := r.register[trans.GetSubId()]; ok {
143 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
144 // Update message data to subscription
145 subs.SubReqMsg = subReqMsg
146 subs.SetCachedResponse(nil, true)
151 subs := r.findExistingSubs(trans, subReqMsg)
153 subs, err = r.allocateSubs(trans, subReqMsg)
161 // Add to subscription
164 defer subs.mutex.Unlock()
166 epamount := subs.EpList.Size()
170 // Subscription route updates
173 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
174 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
176 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)}
177 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
183 r.subIds = append(r.subIds, subs.ReqId.InstanceId)
189 r.register[subs.ReqId.InstanceId] = subs
191 xapp.Logger.Debug("CREATE %s", subs.String())
192 xapp.Logger.Debug("Registry: substable=%v", r.register)
196 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
197 var reportFound bool = false
198 var policyFound bool = false
200 for _, acts := range subReqMsg.ActionSetups {
201 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
204 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
208 if reportFound == true && policyFound == true {
209 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
211 if reportFound == true {
212 return e2ap.E2AP_ActionTypeReport, nil
214 if policyFound == true {
215 return e2ap.E2AP_ActionTypePolicy, nil
217 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
220 // TODO: Works with concurrent calls, but check if can be improved
221 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
224 defer r.mutex.Unlock()
226 defer subs.mutex.Unlock()
228 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
229 epamount := subs.EpList.Size()
230 subId := subs.ReqId.InstanceId
232 if delStatus == false {
237 if waitRouteClean > 0 {
238 time.Sleep(waitRouteClean)
242 defer subs.mutex.Unlock()
243 xapp.Logger.Info("CLEAN %s", subs.String())
247 // Subscription route delete
249 tmpList := xapp.RmrEndpointList{}
250 tmpList.AddEndpoint(trans.GetEndpoint())
251 subRouteAction := SubRouteInfo{tmpList, uint16(subId)}
252 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
255 // Subscription release
258 defer r.mutex.Unlock()
260 if _, ok := r.register[subId]; ok {
261 xapp.Logger.Debug("RELEASE %s", subs.String())
262 delete(r.register, subId)
263 xapp.Logger.Debug("Registry: substable=%v", r.register)
265 r.subIds = append(r.subIds, subId)
267 } else if subs.EpList.Size() > 0 {
269 // Subscription route updates
271 subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
272 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
280 func (r *Registry) GetSubscription(subId uint32) *Subscription {
282 defer r.mutex.Unlock()
283 if _, ok := r.register[subId]; ok {
284 return r.register[subId]
289 func (r *Registry) GetSubscriptionFirstMatch(subIds []uint32) (*Subscription, error) {
291 defer r.mutex.Unlock()
292 for _, subId := range subIds {
293 if _, ok := r.register[subId]; ok {
294 return r.register[subId], nil
297 return nil, fmt.Errorf("No valid subscription found with subIds %v", subIds)