X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fduplicate.go;fp=pkg%2Fcontrol%2Fduplicate.go;h=6900941ac2d70cbfdfd18b018a1b341da5545f88;hb=42723e2593926f1cfa144b503bf043a0fe36e657;hp=c6d0c8ca880ead2bb192e392daac7440895f3ddc;hpb=268d715e3bceab8f7955d89945141efdb2c3b368;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/duplicate.go b/pkg/control/duplicate.go index c6d0c8c..6900941 100644 --- a/pkg/control/duplicate.go +++ b/pkg/control/duplicate.go @@ -36,64 +36,141 @@ type retransEntry struct { } type duplicateCtrl struct { - mutex sync.Mutex - retransMap map[string]retransEntry - collCount int + mutex sync.Mutex + ongoingRequestMap map[string]retransEntry + previousRequestMap map[string]string + collCount int } func (d *duplicateCtrl) Init() { - d.retransMap = make(map[string]retransEntry) + d.ongoingRequestMap = make(map[string]retransEntry) + d.previousRequestMap = make(map[string]string) } -func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, payload interface{}) (error, bool, string) { +func (d *duplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) { + d.mutex.Lock() + defer d.mutex.Unlock() + + if md5sum == "" { + xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId) + return + } + + d.removeOngoingTransaction(md5sum) + + prevRestSubsId, exists := d.previousRequestMap[md5sum] + + if exists { + if prevRestSubsId != restSubsId { + xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId) + } else { + return + } + } else { + xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum) + } + + d.previousRequestMap[md5sum] = restSubsId +} + +func (d *duplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) { + + d.mutex.Lock() + defer d.mutex.Unlock() + + if md5sum == "" { + return "", false + } + + m, e := d.previousRequestMap[md5sum] + + return m, e +} + +func (d *duplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) { + + d.mutex.Lock() + defer d.mutex.Unlock() + + restSubsId, exists := d.previousRequestMap[md5sum] + + if !exists { + if md5sum == "" { + xapp.Logger.Info("Attempted to delete a cached md5sum, md5sum not set yet") + } else { + xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum) + } + } else { + xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId) + delete(d.previousRequestMap, md5sum) + } +} + +func CalculateRequestMd5sum(payload interface{}) (string, error) { var data bytes.Buffer enc := gob.NewEncoder(&data) if err := enc.Encode(payload); err != nil { - xapp.Logger.Error("Failed to encode %v\n", payload) - return err, false, "" + xapp.Logger.Error("%s", err.Error()) + return "", err } hash := md5.Sum(data.Bytes()) - md5sum := hex.EncodeToString(hash[:]) + return hex.EncodeToString(hash[:]), nil +} + +func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool { + + if md5sum == "" { + return false + } d.mutex.Lock() defer d.mutex.Unlock() - entry, present := d.retransMap[md5sum] + entry, present := d.ongoingRequestMap[md5sum] if present { - xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with MD5SUM : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC)) + xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC)) d.collCount++ - return nil, true, md5sum + return true } entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()} - xapp.Logger.Debug("Added Md5SUM %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime) + xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime) - d.retransMap[md5sum] = entry + d.ongoingRequestMap[md5sum] = entry - return nil, false, md5sum + return false } func (d *duplicateCtrl) TransactionComplete(md5sum string) error { + if md5sum == "" { + return nil + } + d.mutex.Lock() defer d.mutex.Unlock() - entry, present := d.retransMap[md5sum] + return d.removeOngoingTransaction(md5sum) +} + +func (d *duplicateCtrl) removeOngoingTransaction(md5sum string) error { + + entry, present := d.ongoingRequestMap[md5sum] if !present { - xapp.Logger.Error("MD5SUM : %s NOT found from table (%v)\n", md5sum, entry) - return fmt.Errorf("Retransmission entry not found for MD5SUM %s", md5sum) + xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum) + return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum) } - xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, MD5SUM : %s\n", entry.restSubsId, md5sum) + xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum) - delete(d.retransMap, md5sum) + delete(d.ongoingRequestMap, md5sum) return nil }