X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fduplicate.go;h=8b7dcdc18bb48cbf0aac3c25aa75d18a36c0e0ad;hb=HEAD;hp=c6d0c8ca880ead2bb192e392daac7440895f3ddc;hpb=b642a19e7527f03458f1b6ca47bca132019aa2cf;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/duplicate.go b/pkg/control/duplicate.go index c6d0c8c..8b7dcdc 100644 --- a/pkg/control/duplicate.go +++ b/pkg/control/duplicate.go @@ -30,70 +30,150 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" ) -type retransEntry struct { +type RetransEntry struct { restSubsId string startTime time.Time } -type duplicateCtrl struct { - mutex sync.Mutex - retransMap map[string]retransEntry - collCount int +type DuplicateCtrl struct { + mutex sync.Mutex + ongoingRequestMap map[string]RetransEntry + previousRequestMap map[string]string + collCount int } -func (d *duplicateCtrl) Init() { - d.retransMap = make(map[string]retransEntry) +func (d *DuplicateCtrl) Init() { + d.ongoingRequestMap = make(map[string]RetransEntry) + d.previousRequestMap = make(map[string]string) } -func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, payload interface{}) (error, bool, string) { +func (d *DuplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) { + d.mutex.Lock() + defer d.mutex.Unlock() + + if md5sum == "" { + xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId) + return + } + + err := d.removeOngoingTransaction(md5sum) + if err != nil { + xapp.Logger.Error("removeOngoingTransaction() failed:%s", err.Error()) + } + + prevRestSubsId, exists := d.previousRequestMap[md5sum] + + if exists { + if prevRestSubsId != restSubsId { + xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId) + } else { + return + } + } else { + xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum) + } + + d.previousRequestMap[md5sum] = restSubsId +} + +func (d *DuplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) { + + d.mutex.Lock() + defer d.mutex.Unlock() + + if md5sum == "" { + return "", false + } + + m, e := d.previousRequestMap[md5sum] + + return m, e +} + +func (d *DuplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) { + + d.mutex.Lock() + defer d.mutex.Unlock() + + restSubsId, exists := d.previousRequestMap[md5sum] + + if !exists { + if md5sum == "" { + xapp.Logger.Debug("Attempted to delete a cached md5sum, md5sum not set yet") + } else { + xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum) + } + } else { + xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId) + delete(d.previousRequestMap, md5sum) + } +} + +func CalculateRequestMd5sum(payload interface{}) (string, error) { var data bytes.Buffer enc := gob.NewEncoder(&data) if err := enc.Encode(payload); err != nil { - xapp.Logger.Error("Failed to encode %v\n", payload) - return err, false, "" + xapp.Logger.Error("%s", err.Error()) + return "", err } hash := md5.Sum(data.Bytes()) - md5sum := hex.EncodeToString(hash[:]) + return hex.EncodeToString(hash[:]), nil +} + +func (d *DuplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool { + + if md5sum == "" { + return false + } d.mutex.Lock() defer d.mutex.Unlock() - entry, present := d.retransMap[md5sum] + entry, present := d.ongoingRequestMap[md5sum] if present { - xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with MD5SUM : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC)) + xapp.Logger.Debug("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC)) d.collCount++ - return nil, true, md5sum + return true } - entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()} + entry = RetransEntry{restSubsId: restSubsId, startTime: time.Now()} - xapp.Logger.Debug("Added Md5SUM %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime) + xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime) - d.retransMap[md5sum] = entry + d.ongoingRequestMap[md5sum] = entry - return nil, false, md5sum + return false } -func (d *duplicateCtrl) TransactionComplete(md5sum string) error { +func (d *DuplicateCtrl) TransactionComplete(md5sum string) error { + + if md5sum == "" { + return nil + } d.mutex.Lock() defer d.mutex.Unlock() - entry, present := d.retransMap[md5sum] + return d.removeOngoingTransaction(md5sum) +} + +func (d *DuplicateCtrl) removeOngoingTransaction(md5sum string) error { + + entry, present := d.ongoingRequestMap[md5sum] if !present { - xapp.Logger.Error("MD5SUM : %s NOT found from table (%v)\n", md5sum, entry) - return fmt.Errorf("Retransmission entry not found for MD5SUM %s", md5sum) + xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum) + return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum) } - xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, MD5SUM : %s\n", entry.restSubsId, md5sum) + xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum) - delete(d.retransMap, md5sum) + delete(d.ongoingRequestMap, md5sum) return nil }