2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
26 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 //-----------------------------------------------------------------------------
33 //-----------------------------------------------------------------------------
35 type Registry struct {
37 register map[uint32]*Subscription
39 rtmgrClient *RtmgrClient
42 func (r *Registry) Initialize() {
43 r.register = make(map[uint32]*Subscription)
45 for i = 0; i < 65535; i++ {
46 r.subIds = append(r.subIds, i+1)
50 func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
52 defer r.mutex.Unlock()
54 resp := models.SubscriptionList{}
55 for _, subs := range r.register {
57 resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.Seq), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
63 func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
64 if len(r.subIds) > 0 {
65 sequenceNumber := r.subIds[0]
66 r.subIds = r.subIds[1:]
67 if _, ok := r.register[sequenceNumber]; ok == true {
68 r.subIds = append(r.subIds, sequenceNumber)
69 return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
71 subs := &Subscription{
78 subs.ReqId.Seq = sequenceNumber
80 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
81 r.subIds = append(r.subIds, subs.ReqId.Seq)
82 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
87 return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
90 func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
92 for _, subs := range r.register {
93 if subs.IsMergeable(trans, subReqMsg) {
96 // check if there has been race conditions
99 //subs has been set to invalid
100 if subs.valid == false {
104 // If size is zero, entry is to be deleted
105 if subs.EpList.Size() == 0 {
109 // try to add to endpointlist.
110 if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
116 xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
123 func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
127 defer r.mutex.Unlock()
130 // Check validity of subscription action types
132 actionType, err := r.CheckActionTypes(subReqMsg)
134 xapp.Logger.Debug("CREATE %s", err)
139 // Find possible existing Policy subscription
141 if actionType == e2ap.E2AP_ActionTypePolicy {
142 if subs, ok := r.register[trans.GetSubId()]; ok {
143 xapp.Logger.Debug("CREATE %s. Existing subscription for Policy found", subs.String())
144 subs.SetCachedResponse(nil, true)
149 subs := r.findExistingSubs(trans, subReqMsg)
151 subs, err = r.allocateSubs(trans, subReqMsg)
159 // Add to subscription
162 defer subs.mutex.Unlock()
164 epamount := subs.EpList.Size()
168 // Subscription route updates
171 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
172 err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
174 subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
175 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
181 r.subIds = append(r.subIds, subs.ReqId.Seq)
187 r.register[subs.ReqId.Seq] = subs
189 xapp.Logger.Debug("CREATE %s", subs.String())
190 xapp.Logger.Debug("Registry: substable=%v", r.register)
194 func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) {
195 var reportFound bool = false
196 var policyFound bool = false
198 for _, acts := range subReqMsg.ActionSetups {
199 if acts.ActionType == e2ap.E2AP_ActionTypeReport {
202 if acts.ActionType == e2ap.E2AP_ActionTypePolicy {
206 if reportFound == true && policyFound == true {
207 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Report and Policy in same RICactions-ToBeSetup-List")
209 if reportFound == true {
210 return e2ap.E2AP_ActionTypeReport, nil
212 if policyFound == true {
213 return e2ap.E2AP_ActionTypePolicy, nil
215 return e2ap.E2AP_ActionTypeInvalid, fmt.Errorf("Invalid action type in RICactions-ToBeSetup-List")
218 // TODO: Works with concurrent calls, but check if can be improved
219 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
222 defer r.mutex.Unlock()
224 defer subs.mutex.Unlock()
226 delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
227 epamount := subs.EpList.Size()
228 seqId := subs.ReqId.Seq
230 if delStatus == false {
235 if waitRouteClean > 0 {
236 time.Sleep(waitRouteClean)
240 defer subs.mutex.Unlock()
241 xapp.Logger.Info("CLEAN %s", subs.String())
245 // Subscription route delete
247 tmpList := RmrEndpointList{}
248 tmpList.AddEndpoint(trans.GetEndpoint())
249 subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
250 r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
253 // Subscription release
256 defer r.mutex.Unlock()
258 if _, ok := r.register[seqId]; ok {
259 xapp.Logger.Debug("RELEASE %s", subs.String())
260 delete(r.register, seqId)
261 xapp.Logger.Debug("Registry: substable=%v", r.register)
263 r.subIds = append(r.subIds, seqId)
265 } else if subs.EpList.Size() > 0 {
267 // Subscription route updates
269 subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
270 r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
278 func (r *Registry) GetSubscription(sn uint32) *Subscription {
280 defer r.mutex.Unlock()
281 if _, ok := r.register[sn]; ok {
282 return r.register[sn]
287 func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
289 defer r.mutex.Unlock()
290 for _, id := range ids {
291 if _, ok := r.register[id]; ok {
292 return r.register[id], nil
295 return nil, fmt.Errorf("No valid subscription found with ids %v", ids)