Delete all subscriptions when Ran is under reset.
[ric-plt/submgr.git] / pkg / control / duplicate.go
1 /*
2 ==================================================================================
3   Copyright (c) 2021 Nokia
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 package control
20
21 import (
22         "bytes"
23         "crypto/md5"
24         "encoding/gob"
25         "encoding/hex"
26         "fmt"
27         "sync"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 )
32
33 type RetransEntry struct {
34         restSubsId string
35         startTime  time.Time
36 }
37
38 type DuplicateCtrl struct {
39         mutex              sync.Mutex
40         ongoingRequestMap  map[string]RetransEntry
41         previousRequestMap map[string]string
42         collCount          int
43 }
44
45 func (d *DuplicateCtrl) Init() {
46         d.ongoingRequestMap = make(map[string]RetransEntry)
47         d.previousRequestMap = make(map[string]string)
48 }
49
50 func (d *DuplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) {
51
52         d.mutex.Lock()
53         defer d.mutex.Unlock()
54
55         if md5sum == "" {
56                 xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId)
57                 return
58         }
59
60         err := d.removeOngoingTransaction(md5sum)
61         if err != nil {
62                 xapp.Logger.Error("removeOngoingTransaction() failed:%s", err.Error())
63         }
64
65         prevRestSubsId, exists := d.previousRequestMap[md5sum]
66
67         if exists {
68                 if prevRestSubsId != restSubsId {
69                         xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId)
70                 } else {
71                         return
72                 }
73         } else {
74                 xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum)
75         }
76
77         d.previousRequestMap[md5sum] = restSubsId
78 }
79
80 func (d *DuplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) {
81
82         d.mutex.Lock()
83         defer d.mutex.Unlock()
84
85         if md5sum == "" {
86                 return "", false
87         }
88
89         m, e := d.previousRequestMap[md5sum]
90
91         return m, e
92 }
93
94 func (d *DuplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) {
95
96         d.mutex.Lock()
97         defer d.mutex.Unlock()
98
99         restSubsId, exists := d.previousRequestMap[md5sum]
100
101         if !exists {
102                 if md5sum == "" {
103                         xapp.Logger.Debug("Attempted to delete a cached md5sum, md5sum not set yet")
104                 } else {
105                         xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum)
106                 }
107         } else {
108                 xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId)
109                 delete(d.previousRequestMap, md5sum)
110         }
111 }
112
113 func CalculateRequestMd5sum(payload interface{}) (string, error) {
114         var data bytes.Buffer
115         enc := gob.NewEncoder(&data)
116
117         if err := enc.Encode(payload); err != nil {
118                 xapp.Logger.Error("%s", err.Error())
119                 return "", err
120         }
121
122         hash := md5.Sum(data.Bytes())
123
124         return hex.EncodeToString(hash[:]), nil
125 }
126
127 func (d *DuplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool {
128
129         if md5sum == "" {
130                 return false
131         }
132
133         d.mutex.Lock()
134         defer d.mutex.Unlock()
135
136         entry, present := d.ongoingRequestMap[md5sum]
137
138         if present {
139                 xapp.Logger.Debug("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
140                 d.collCount++
141                 return true
142         }
143
144         entry = RetransEntry{restSubsId: restSubsId, startTime: time.Now()}
145
146         xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
147
148         d.ongoingRequestMap[md5sum] = entry
149
150         return false
151 }
152
153 func (d *DuplicateCtrl) TransactionComplete(md5sum string) error {
154
155         if md5sum == "" {
156                 return nil
157         }
158
159         d.mutex.Lock()
160         defer d.mutex.Unlock()
161
162         return d.removeOngoingTransaction(md5sum)
163 }
164
165 func (d *DuplicateCtrl) removeOngoingTransaction(md5sum string) error {
166
167         entry, present := d.ongoingRequestMap[md5sum]
168
169         if !present {
170                 xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum)
171                 return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum)
172         }
173
174         xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum)
175
176         delete(d.ongoingRequestMap, md5sum)
177
178         return nil
179 }