[RICPLT-2813] Update Swagger: /e2t/list
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
index 7243cde..6e00592 100644 (file)
@@ -4,11 +4,11 @@ import (
        "context"
        "fmt"
        "github.com/pkg/errors"
-       "log"
        "reflect"
        "sync"
        "time"
        "xappmock/enums"
+       "xappmock/logger"
        "xappmock/models"
        "xappmock/rmr"
        "xappmock/sender"
@@ -16,6 +16,7 @@ import (
 
 // Id -> Command
 var configuration = make(map[string]*models.JsonCommand)
+
 // Rmr Message Id -> Command
 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
 
@@ -31,17 +32,21 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma
 }
 
 type Dispatcher struct {
-       rmrService              *rmr.Service
-       processResult           models.ProcessResult
+       rmrService    *rmr.Service
+       processResult models.ProcessResult
+       logger        *logger.Logger
+       jsonSender    *sender.JsonSender
 }
 
 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
        return d.processResult
 }
 
-func New(rmrService *rmr.Service) *Dispatcher {
+func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
        return &Dispatcher{
-               rmrService:              rmrService,
+               rmrService: rmrService,
+               logger:     logger,
+               jsonSender: jsonSender,
        }
 }
 
@@ -60,20 +65,32 @@ func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
 }
 
 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
-       err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+
+       if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+               now := time.Now()
+               d.processResult.StartTime = &now
+       }
+
+       err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
 
        if err != nil {
-               log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
+               d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
                d.processResult.Err = err
-               d.processResult.Stats.SentErrorCount++
+               d.processResult.Stats.SentErrorCount.Inc()
                return err
        }
 
-       d.processResult.Stats.SentCount++
+       d.processResult.Stats.SentCount.Inc()
        return nil
 }
 
 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+       if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+               now := time.Now()
+               d.processResult.StartTime = &now
+       }
+
        for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
 
                select {
@@ -82,15 +99,15 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm
                default:
                }
 
-               err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+               err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
 
                if err != nil {
-                       log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
-                       d.processResult.Stats.SentErrorCount++
+                       d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
+                       d.processResult.Stats.SentErrorCount.Inc()
                        continue
                }
 
-               d.processResult.Stats.SentCount++
+               d.processResult.Stats.SentCount.Inc()
                time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
        }
 }
@@ -212,6 +229,16 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom
        sendAndReceiveWg.Wait()
 }
 
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+       responseCommand, ok := configuration[command.SendCommandId]
+
+       if !ok {
+               return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+       }
+
+       return responseCommand, nil
+}
+
 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
        for {
                select {
@@ -223,24 +250,38 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
+                       d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
                        d.processResult.Err = err
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        return
                }
 
+               if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+                       now := time.Now()
+                       d.processResult.StartTime = &now
+               }
+
+               messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
 
                if len(command.SendCommandId) > 0 {
-                       responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
+                       responseCommand, err := getResponseCommand(command)
+
+                       if err != nil {
+                               d.processResult.Err = err
+                               return
+                       }
+
                        _ = d.sendNoRepeat(*responseCommand)
                }
 
@@ -249,7 +290,20 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models
 }
 
 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
-       for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
+
+       var responseCommand *models.JsonCommand
+
+       if len(command.SendCommandId) > 0 {
+               var err error
+               responseCommand, err = getResponseCommand(command)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+       }
+
+       for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
                select {
                case <-ctx.Done():
                        return
@@ -259,24 +313,30 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command mode
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
-                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        continue
                }
 
+               if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+                       now := time.Now()
+                       d.processResult.StartTime = &now
+               }
+
+               messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
 
-               if len(command.SendCommandId) > 0 {
-                       responseCommand := configuration[command.SendCommandId]
+               if responseCommand != nil {
                        _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
                }
        }