Added duplicate detection changes
[ric-plt/submgr.git] / pkg / control / duplicate.go
index c6d0c8c..6900941 100644 (file)
@@ -36,64 +36,141 @@ type retransEntry struct {
 }
 
 type duplicateCtrl struct {
-       mutex      sync.Mutex
-       retransMap map[string]retransEntry
-       collCount  int
+       mutex              sync.Mutex
+       ongoingRequestMap  map[string]retransEntry
+       previousRequestMap map[string]string
+       collCount          int
 }
 
 func (d *duplicateCtrl) Init() {
-       d.retransMap = make(map[string]retransEntry)
+       d.ongoingRequestMap = make(map[string]retransEntry)
+       d.previousRequestMap = make(map[string]string)
 }
 
-func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, payload interface{}) (error, bool, string) {
+func (d *duplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) {
 
+       d.mutex.Lock()
+       defer d.mutex.Unlock()
+
+       if md5sum == "" {
+               xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId)
+               return
+       }
+
+       d.removeOngoingTransaction(md5sum)
+
+       prevRestSubsId, exists := d.previousRequestMap[md5sum]
+
+       if exists {
+               if prevRestSubsId != restSubsId {
+                       xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId)
+               } else {
+                       return
+               }
+       } else {
+               xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum)
+       }
+
+       d.previousRequestMap[md5sum] = restSubsId
+}
+
+func (d *duplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) {
+
+       d.mutex.Lock()
+       defer d.mutex.Unlock()
+
+       if md5sum == "" {
+               return "", false
+       }
+
+       m, e := d.previousRequestMap[md5sum]
+
+       return m, e
+}
+
+func (d *duplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) {
+
+       d.mutex.Lock()
+       defer d.mutex.Unlock()
+
+       restSubsId, exists := d.previousRequestMap[md5sum]
+
+       if !exists {
+               if md5sum == "" {
+                       xapp.Logger.Info("Attempted to delete a cached md5sum, md5sum not set yet")
+               } else {
+                       xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum)
+               }
+       } else {
+               xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId)
+               delete(d.previousRequestMap, md5sum)
+       }
+}
+
+func CalculateRequestMd5sum(payload interface{}) (string, error) {
        var data bytes.Buffer
        enc := gob.NewEncoder(&data)
 
        if err := enc.Encode(payload); err != nil {
-               xapp.Logger.Error("Failed to encode %v\n", payload)
-               return err, false, ""
+               xapp.Logger.Error("%s", err.Error())
+               return "", err
        }
 
        hash := md5.Sum(data.Bytes())
 
-       md5sum := hex.EncodeToString(hash[:])
+       return hex.EncodeToString(hash[:]), nil
+}
+
+func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool {
+
+       if md5sum == "" {
+               return false
+       }
 
        d.mutex.Lock()
        defer d.mutex.Unlock()
 
-       entry, present := d.retransMap[md5sum]
+       entry, present := d.ongoingRequestMap[md5sum]
 
        if present {
-               xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with MD5SUM : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
+               xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
                d.collCount++
-               return nil, true, md5sum
+               return true
        }
 
        entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()}
 
-       xapp.Logger.Debug("Added Md5SUM %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
+       xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
 
-       d.retransMap[md5sum] = entry
+       d.ongoingRequestMap[md5sum] = entry
 
-       return nil, false, md5sum
+       return false
 }
 
 func (d *duplicateCtrl) TransactionComplete(md5sum string) error {
 
+       if md5sum == "" {
+               return nil
+       }
+
        d.mutex.Lock()
        defer d.mutex.Unlock()
 
-       entry, present := d.retransMap[md5sum]
+       return d.removeOngoingTransaction(md5sum)
+}
+
+func (d *duplicateCtrl) removeOngoingTransaction(md5sum string) error {
+
+       entry, present := d.ongoingRequestMap[md5sum]
 
        if !present {
-               xapp.Logger.Error("MD5SUM : %s NOT found from table (%v)\n", md5sum, entry)
-               return fmt.Errorf("Retransmission entry not found for MD5SUM %s", md5sum)
+               xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum)
+               return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum)
        }
 
-       xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, MD5SUM : %s\n", entry.restSubsId, md5sum)
+       xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum)
 
-       delete(d.retransMap, md5sum)
+       delete(d.ongoingRequestMap, md5sum)
 
        return nil
 }