RICPLT-3014 Subscription multiple endpoints
[ric-plt/submgr.git] / pkg / control / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "fmt"
24         "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
26         "strconv"
27         "sync"
28         "time"
29 )
30
31 //-----------------------------------------------------------------------------
32 //
33 //-----------------------------------------------------------------------------
34 type Subscription struct {
35         mutex      sync.Mutex      // Lock
36         registry   *Registry       // Registry
37         Seq        uint16          // SubsId
38         Meid       *xapp.RMRMeid   // Meid/ RanName
39         EpList     RmrEndpointList // Endpoints
40         DelEpList  RmrEndpointList // Endpoints
41         DelSeq     uint64
42         TransLock  sync.Mutex                     // Lock transactions, only one executed per time for subs
43         TheTrans   *Transaction                   // Ongoing transaction from xapp
44         SubReqMsg  *e2ap.E2APSubscriptionRequest  // Subscription information
45         SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information
46 }
47
48 func (s *Subscription) stringImpl() string {
49         return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
50 }
51
52 func (s *Subscription) String() string {
53         s.mutex.Lock()
54         defer s.mutex.Unlock()
55         return s.stringImpl()
56 }
57
58 func (s *Subscription) GetSubId() uint16 {
59         s.mutex.Lock()
60         defer s.mutex.Unlock()
61         return s.Seq
62 }
63
64 func (s *Subscription) GetMeid() *xapp.RMRMeid {
65         s.mutex.Lock()
66         defer s.mutex.Unlock()
67         if s.Meid != nil {
68                 return s.Meid
69         }
70         return nil
71 }
72
73 func (s *Subscription) AddEndpoint(ep *RmrEndpoint) error {
74         s.mutex.Lock()
75         defer s.mutex.Unlock()
76         if ep == nil {
77                 return fmt.Errorf("AddEndpoint no endpoint given")
78         }
79         if s.EpList.AddEndpoint(ep) {
80                 s.DelEpList.DelEndpoint(ep)
81                 if s.EpList.Size() == 1 {
82                         return s.updateRouteImpl(CREATE)
83                 }
84                 return s.updateRouteImpl(MERGE)
85         }
86         return nil
87 }
88
89 func (s *Subscription) DelEndpoint(ep *RmrEndpoint) error {
90         s.mutex.Lock()
91         defer s.mutex.Unlock()
92         var err error
93         if ep == nil {
94                 return fmt.Errorf("DelEndpoint no endpoint given")
95         }
96         if s.EpList.HasEndpoint(ep) == false {
97                 return fmt.Errorf("DelEndpoint endpoint not found")
98         }
99         if s.DelEpList.HasEndpoint(ep) == true {
100                 return fmt.Errorf("DelEndpoint endpoint already under del")
101         }
102         s.DelEpList.AddEndpoint(ep)
103         go s.CleanCheck()
104         return err
105 }
106
107 func (s *Subscription) CleanCheck() {
108         s.mutex.Lock()
109         defer s.mutex.Unlock()
110         s.DelSeq++
111         // Only one clean ongoing
112         if s.DelSeq > 1 {
113                 return
114         }
115         var currSeq uint64 = 0
116         // Make sure that routes to be deleted
117         // are not deleted too fast
118         for currSeq < s.DelSeq {
119                 currSeq = s.DelSeq
120                 s.mutex.Unlock()
121                 time.Sleep(5 * time.Second)
122                 s.mutex.Lock()
123         }
124         xapp.Logger.Info("DelEndpoint: delete cleaning %s", s.stringImpl())
125         if s.EpList.Size() <= s.DelEpList.Size() {
126                 s.updateRouteImpl(DELETE)
127                 go s.registry.DelSubscription(s.Seq)
128         } else if s.EpList.DelEndpoints(&s.DelEpList) {
129                 s.updateRouteImpl(MERGE)
130         }
131         s.DelSeq = 0
132
133 }
134
135 func (s *Subscription) IsTransactionReserved() bool {
136         s.mutex.Lock()
137         defer s.mutex.Unlock()
138         if s.TheTrans != nil {
139                 return true
140         }
141         return false
142
143 }
144
145 func (s *Subscription) GetTransaction() *Transaction {
146         s.mutex.Lock()
147         defer s.mutex.Unlock()
148         return s.TheTrans
149 }
150
151 func (s *Subscription) WaitTransactionTurn(trans *Transaction) {
152         s.TransLock.Lock()
153         s.mutex.Lock()
154         s.TheTrans = trans
155         s.mutex.Unlock()
156 }
157
158 func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) {
159         s.mutex.Lock()
160         if trans != nil && trans == s.TheTrans {
161                 s.TheTrans = nil
162         }
163         s.mutex.Unlock()
164         s.TransLock.Unlock()
165 }
166
167 func (s *Subscription) updateRouteImpl(act Action) error {
168         subRouteAction := SubRouteInfo{act, s.EpList, s.Seq}
169         err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
170         if err != nil {
171                 return fmt.Errorf("%s %s", s.stringImpl(), err.Error())
172         }
173         return nil
174 }