X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fduplicate.go;fp=pkg%2Fcontrol%2Fduplicate.go;h=c6d0c8ca880ead2bb192e392daac7440895f3ddc;hb=b642a19e7527f03458f1b6ca47bca132019aa2cf;hp=0000000000000000000000000000000000000000;hpb=55d2a285e4914afce7492c6b4b6feebe5485210b;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/duplicate.go b/pkg/control/duplicate.go new file mode 100644 index 0000000..c6d0c8c --- /dev/null +++ b/pkg/control/duplicate.go @@ -0,0 +1,99 @@ +/* +================================================================================== + Copyright (c) 2021 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package control + +import ( + "bytes" + "crypto/md5" + "encoding/gob" + "encoding/hex" + "fmt" + "sync" + "time" + + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" +) + +type retransEntry struct { + restSubsId string + startTime time.Time +} + +type duplicateCtrl struct { + mutex sync.Mutex + retransMap map[string]retransEntry + collCount int +} + +func (d *duplicateCtrl) Init() { + d.retransMap = make(map[string]retransEntry) +} + +func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, payload interface{}) (error, bool, string) { + + var data bytes.Buffer + enc := gob.NewEncoder(&data) + + if err := enc.Encode(payload); err != nil { + xapp.Logger.Error("Failed to encode %v\n", payload) + return err, false, "" + } + + hash := md5.Sum(data.Bytes()) + + md5sum := hex.EncodeToString(hash[:]) + + d.mutex.Lock() + defer d.mutex.Unlock() + + entry, present := d.retransMap[md5sum] + + if present { + xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with MD5SUM : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC)) + d.collCount++ + return nil, true, md5sum + } + + entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()} + + xapp.Logger.Debug("Added Md5SUM %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime) + + d.retransMap[md5sum] = entry + + return nil, false, md5sum +} + +func (d *duplicateCtrl) TransactionComplete(md5sum string) error { + + d.mutex.Lock() + defer d.mutex.Unlock() + + entry, present := d.retransMap[md5sum] + + if !present { + xapp.Logger.Error("MD5SUM : %s NOT found from table (%v)\n", md5sum, entry) + return fmt.Errorf("Retransmission entry not found for MD5SUM %s", md5sum) + } + + xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, MD5SUM : %s\n", entry.restSubsId, md5sum) + + delete(d.retransMap, md5sum) + + return nil +}