Merge "[RICPLT-2585] Init fixes | Init UTs.........." into PI3
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
index 7243cde..bd75e7b 100644 (file)
@@ -16,6 +16,7 @@ import (
 
 // Id -> Command
 var configuration = make(map[string]*models.JsonCommand)
+
 // Rmr Message Id -> Command
 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
 
@@ -31,8 +32,8 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma
 }
 
 type Dispatcher struct {
-       rmrService              *rmr.Service
-       processResult           models.ProcessResult
+       rmrService    *rmr.Service
+       processResult models.ProcessResult
 }
 
 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
@@ -41,7 +42,7 @@ func (d *Dispatcher) GetProcessResult() models.ProcessResult {
 
 func New(rmrService *rmr.Service) *Dispatcher {
        return &Dispatcher{
-               rmrService:              rmrService,
+               rmrService: rmrService,
        }
 }
 
@@ -65,11 +66,11 @@ func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
        if err != nil {
                log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
                d.processResult.Err = err
-               d.processResult.Stats.SentErrorCount++
+               d.processResult.Stats.SentErrorCount.Inc()
                return err
        }
 
-       d.processResult.Stats.SentCount++
+       d.processResult.Stats.SentCount.Inc()
        return nil
 }
 
@@ -86,11 +87,11 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm
 
                if err != nil {
                        log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
-                       d.processResult.Stats.SentErrorCount++
+                       d.processResult.Stats.SentErrorCount.Inc()
                        continue
                }
 
-               d.processResult.Stats.SentCount++
+               d.processResult.Stats.SentCount.Inc()
                time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
        }
 }
@@ -212,6 +213,16 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom
        sendAndReceiveWg.Wait()
 }
 
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+       responseCommand, ok := configuration[command.SendCommandId]
+
+       if !ok {
+               return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+       }
+
+       return responseCommand, nil
+}
+
 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
        for {
                select {
@@ -223,24 +234,33 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
+                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
                        d.processResult.Err = err
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        return
                }
 
+               messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               log.Printf("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
 
                if len(command.SendCommandId) > 0 {
-                       responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
+                       responseCommand, err := getResponseCommand(command)
+
+                       if err != nil {
+                               d.processResult.Err = err
+                               return
+                       }
+
                        _ = d.sendNoRepeat(*responseCommand)
                }
 
@@ -249,7 +269,20 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models
 }
 
 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
-       for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
+
+       var responseCommand *models.JsonCommand
+
+       if len(command.SendCommandId) > 0 {
+               var err error
+               responseCommand, err = getResponseCommand(command)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+       }
+
+       for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
                select {
                case <-ctx.Done():
                        return
@@ -260,23 +293,24 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command mode
 
                if err != nil {
                        log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        continue
                }
 
+               messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               log.Printf("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
 
-               if len(command.SendCommandId) > 0 {
-                       responseCommand := configuration[command.SendCommandId]
+               if responseCommand != nil {
                        _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
                }
        }