Fixed function id handling and improved ut fail handling
[ric-plt/submgr.git] / pkg / control / registry.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
26         "sync"
27         "time"
28 )
29
30 //-----------------------------------------------------------------------------
31 //
32 //-----------------------------------------------------------------------------
33 type Registry struct {
34         mutex       sync.Mutex
35         register    map[uint16]*Subscription
36         subIds      []uint16
37         rtmgrClient *RtmgrClient
38 }
39
40 func (r *Registry) Initialize() {
41         r.register = make(map[uint16]*Subscription)
42         var i uint16
43         for i = 0; i < 65535; i++ {
44                 r.subIds = append(r.subIds, i+1)
45         }
46 }
47
48 func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
49         r.mutex.Lock()
50         defer r.mutex.Unlock()
51
52         var sequenceNumber uint16
53
54         //
55         // Allocate subscription
56         //
57         if len(r.subIds) > 0 {
58                 sequenceNumber = r.subIds[0]
59                 r.subIds = r.subIds[1:]
60                 if _, ok := r.register[sequenceNumber]; ok == true {
61                         r.subIds = append(r.subIds, sequenceNumber)
62                         return nil, fmt.Errorf("Registry: Failed to reserves subscription")
63                 }
64         } else {
65                 return nil, fmt.Errorf("Registry: Failed to reserves subscription no free ids")
66         }
67         subs := &Subscription{
68                 registry: r,
69                 Seq:      sequenceNumber,
70                 Meid:     trans.Meid,
71         }
72
73         //
74         // Add to subscription
75         //
76         subs.mutex.Lock()
77         defer subs.mutex.Unlock()
78
79         if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
80                 r.subIds = append(r.subIds, sequenceNumber)
81                 return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
82         }
83         epamount := subs.EpList.Size()
84
85         r.mutex.Unlock()
86         //
87         // Subscription route updates
88         //
89         var err error
90         if epamount == 1 {
91                 subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
92                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
93         } else {
94                 subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
95                 err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
96         }
97         r.mutex.Lock()
98
99         if err != nil {
100                 r.subIds = append(r.subIds, sequenceNumber)
101                 return nil, err
102         }
103         subs.SubReqMsg = subReqMsg
104
105         r.register[sequenceNumber] = subs
106         xapp.Logger.Debug("Registry: Create %s", subs.String())
107         xapp.Logger.Debug("Registry: substable=%v", r.register)
108         return subs, nil
109 }
110
111 func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
112         r.mutex.Lock()
113         defer r.mutex.Unlock()
114         subs.mutex.Lock()
115         defer subs.mutex.Unlock()
116
117         delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
118         epamount := subs.EpList.Size()
119
120         //
121         // If last endpoint remove from register map
122         //
123         if epamount == 0 {
124                 if _, ok := r.register[subs.Seq]; ok {
125                         xapp.Logger.Debug("Registry: Delete %s", subs.String())
126                         delete(r.register, subs.Seq)
127                         xapp.Logger.Debug("Registry: substable=%v", r.register)
128                 }
129         }
130         r.mutex.Unlock()
131
132         //
133         // Wait some time before really do route updates
134         //
135         if waitRouteClean > 0 {
136                 subs.mutex.Unlock()
137                 time.Sleep(waitRouteClean)
138                 subs.mutex.Lock()
139         }
140
141         xapp.Logger.Info("Registry: Cleaning %s", subs.String())
142
143         //
144         // Subscription route updates
145         //
146         if delStatus {
147                 if epamount == 0 {
148                         tmpList := RmrEndpointList{}
149                         tmpList.AddEndpoint(trans.GetEndpoint())
150                         subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
151                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
152                 } else {
153                         subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
154                         r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
155                 }
156         }
157
158         r.mutex.Lock()
159         //
160         // If last endpoint free seq nro
161         //
162         if epamount == 0 {
163                 r.subIds = append(r.subIds, subs.Seq)
164         }
165
166         return nil
167 }
168
169 func (r *Registry) GetSubscription(sn uint16) *Subscription {
170         r.mutex.Lock()
171         defer r.mutex.Unlock()
172         if _, ok := r.register[sn]; ok {
173                 return r.register[sn]
174         }
175         return nil
176 }
177
178 func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
179         r.mutex.Lock()
180         defer r.mutex.Unlock()
181         for _, id := range ids {
182                 if _, ok := r.register[id]; ok {
183                         return r.register[id], nil
184                 }
185         }
186         return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
187 }