Added duplicate detection changes
[ric-plt/submgr.git] / pkg / control / duplicate.go
1 /*
2 ==================================================================================
3   Copyright (c) 2021 Nokia
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 ==================================================================================
17 */
18
19 package control
20
21 import (
22         "bytes"
23         "crypto/md5"
24         "encoding/gob"
25         "encoding/hex"
26         "fmt"
27         "sync"
28         "time"
29
30         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
31 )
32
33 type retransEntry struct {
34         restSubsId string
35         startTime  time.Time
36 }
37
38 type duplicateCtrl struct {
39         mutex              sync.Mutex
40         ongoingRequestMap  map[string]retransEntry
41         previousRequestMap map[string]string
42         collCount          int
43 }
44
45 func (d *duplicateCtrl) Init() {
46         d.ongoingRequestMap = make(map[string]retransEntry)
47         d.previousRequestMap = make(map[string]string)
48 }
49
50 func (d *duplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) {
51
52         d.mutex.Lock()
53         defer d.mutex.Unlock()
54
55         if md5sum == "" {
56                 xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId)
57                 return
58         }
59
60         d.removeOngoingTransaction(md5sum)
61
62         prevRestSubsId, exists := d.previousRequestMap[md5sum]
63
64         if exists {
65                 if prevRestSubsId != restSubsId {
66                         xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId)
67                 } else {
68                         return
69                 }
70         } else {
71                 xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum)
72         }
73
74         d.previousRequestMap[md5sum] = restSubsId
75 }
76
77 func (d *duplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) {
78
79         d.mutex.Lock()
80         defer d.mutex.Unlock()
81
82         if md5sum == "" {
83                 return "", false
84         }
85
86         m, e := d.previousRequestMap[md5sum]
87
88         return m, e
89 }
90
91 func (d *duplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) {
92
93         d.mutex.Lock()
94         defer d.mutex.Unlock()
95
96         restSubsId, exists := d.previousRequestMap[md5sum]
97
98         if !exists {
99                 if md5sum == "" {
100                         xapp.Logger.Info("Attempted to delete a cached md5sum, md5sum not set yet")
101                 } else {
102                         xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum)
103                 }
104         } else {
105                 xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId)
106                 delete(d.previousRequestMap, md5sum)
107         }
108 }
109
110 func CalculateRequestMd5sum(payload interface{}) (string, error) {
111         var data bytes.Buffer
112         enc := gob.NewEncoder(&data)
113
114         if err := enc.Encode(payload); err != nil {
115                 xapp.Logger.Error("%s", err.Error())
116                 return "", err
117         }
118
119         hash := md5.Sum(data.Bytes())
120
121         return hex.EncodeToString(hash[:]), nil
122 }
123
124 func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool {
125
126         if md5sum == "" {
127                 return false
128         }
129
130         d.mutex.Lock()
131         defer d.mutex.Unlock()
132
133         entry, present := d.ongoingRequestMap[md5sum]
134
135         if present {
136                 xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
137                 d.collCount++
138                 return true
139         }
140
141         entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()}
142
143         xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
144
145         d.ongoingRequestMap[md5sum] = entry
146
147         return false
148 }
149
150 func (d *duplicateCtrl) TransactionComplete(md5sum string) error {
151
152         if md5sum == "" {
153                 return nil
154         }
155
156         d.mutex.Lock()
157         defer d.mutex.Unlock()
158
159         return d.removeOngoingTransaction(md5sum)
160 }
161
162 func (d *duplicateCtrl) removeOngoingTransaction(md5sum string) error {
163
164         entry, present := d.ongoingRequestMap[md5sum]
165
166         if !present {
167                 xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum)
168                 return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum)
169         }
170
171         xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum)
172
173         delete(d.ongoingRequestMap, md5sum)
174
175         return nil
176 }