[RICPLT-2813] Update Swagger: /e2t/list
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
index 1bf65f1..6e00592 100644 (file)
@@ -4,11 +4,11 @@ import (
        "context"
        "fmt"
        "github.com/pkg/errors"
-       "log"
        "reflect"
        "sync"
        "time"
        "xappmock/enums"
+       "xappmock/logger"
        "xappmock/models"
        "xappmock/rmr"
        "xappmock/sender"
@@ -16,6 +16,7 @@ import (
 
 // Id -> Command
 var configuration = make(map[string]*models.JsonCommand)
+
 // Rmr Message Id -> Command
 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
 
@@ -33,15 +34,19 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma
 type Dispatcher struct {
        rmrService    *rmr.Service
        processResult models.ProcessResult
+       logger        *logger.Logger
+       jsonSender    *sender.JsonSender
 }
 
 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
        return d.processResult
 }
 
-func New(rmrService *rmr.Service) *Dispatcher {
+func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
        return &Dispatcher{
                rmrService: rmrService,
+               logger:     logger,
+               jsonSender: jsonSender,
        }
 }
 
@@ -50,29 +55,42 @@ func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
                return errors.New(fmt.Sprintf("invalid cmd, no id"))
        }
        configuration[cmd.Id] = &cmd
+       return nil
 
-       if len(cmd.ReceiveRmrMessageType) == 0 {
-               return nil
-       }
-
-       return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd)
+       //      if len(cmd.ReceiveCommandId) == 0 {
+       //              return nil
+       //      }
+       //
+       //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
 }
 
-func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) {
-       err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
+
+       if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+               now := time.Now()
+               d.processResult.StartTime = &now
+       }
+
+       err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
 
        if err != nil {
-               log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
+               d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
                d.processResult.Err = err
-               d.processResult.Stats.SentErrorCount++
-               return
+               d.processResult.Stats.SentErrorCount.Inc()
+               return err
        }
 
-       d.processResult.Stats.SentCount++
-
+       d.processResult.Stats.SentCount.Inc()
+       return nil
 }
 
 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+       if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+               now := time.Now()
+               d.processResult.StartTime = &now
+       }
+
        for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
 
                select {
@@ -81,26 +99,47 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm
                default:
                }
 
-               err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+               err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
 
                if err != nil {
-                       log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
-                       d.processResult.Stats.SentErrorCount++
+                       d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
+                       d.processResult.Stats.SentErrorCount.Inc()
                        continue
                }
 
-               d.processResult.Stats.SentCount++
+               d.processResult.Stats.SentCount.Inc()
                time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
        }
 }
 
+func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
+       command, ok := configuration[receiveCommandId]
+
+       if !ok {
+               return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
+       }
+
+       if len(command.RmrMessageType) == 0 {
+               return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
+       }
+
+       return command.RmrMessageType, nil
+}
+
 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
 
        defer sendAndReceiveWg.Done()
        var listenAndHandleWg sync.WaitGroup
 
-       if len(command.ReceiveRmrMessageType) > 0 {
-               err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+       if len(command.ReceiveCommandId) > 0 {
+               rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+
+               err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
 
                if err != nil {
                        d.processResult.Err = err
@@ -108,16 +147,21 @@ func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.Wai
                }
 
                listenAndHandleWg.Add(1)
-               go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+               go d.listenAndHandle(ctx, &listenAndHandleWg, command)
        }
 
        if command.RepeatCount == 0 {
-               d.sendNoRepeat(command)
+               err := d.sendNoRepeat(command)
+
+               if err != nil {
+                       return
+               }
+
        } else {
                d.sendWithRepeat(ctx, command)
        }
 
-       if len(command.ReceiveRmrMessageType) > 0 {
+       if len(command.ReceiveCommandId) > 0 {
                listenAndHandleWg.Wait()
        }
 }
@@ -126,7 +170,7 @@ func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.
 
        defer sendAndReceiveWg.Done()
 
-       err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+       err := addRmrMessageToWaitFor(command.RmrMessageType, command)
 
        if err != nil {
                d.processResult.Err = err
@@ -135,7 +179,7 @@ func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.
 
        var listenAndHandleWg sync.WaitGroup
        listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
-       d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+       d.listenAndHandle(ctx, &listenAndHandleWg, command)
 }
 
 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
@@ -185,7 +229,17 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom
        sendAndReceiveWg.Wait()
 }
 
-func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+       responseCommand, ok := configuration[command.SendCommandId]
+
+       if !ok {
+               return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+       }
+
+       return responseCommand, nil
+}
+
+func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
        for {
                select {
                case <-ctx.Done():
@@ -196,31 +250,60 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
+                       d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
                        d.processResult.Err = err
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        return
                }
 
+               if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+                       now := time.Now()
+                       d.processResult.StartTime = &now
+               }
+
+               messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+               if len(command.SendCommandId) > 0 {
+                       responseCommand, err := getResponseCommand(command)
+
+                       if err != nil {
+                               d.processResult.Err = err
+                               return
+                       }
+
+                       _ = d.sendNoRepeat(*responseCommand)
+               }
+
                return
        }
 }
 
-func (d *Dispatcher) receive(ctx context.Context) {
+func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
 
-}
+       var responseCommand *models.JsonCommand
+
+       if len(command.SendCommandId) > 0 {
+               var err error
+               responseCommand, err = getResponseCommand(command)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+       }
 
-func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) {
-       for d.processResult.Stats.ReceivedExpectedCount < repeatCount {
+       for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
                select {
                case <-ctx.Done():
                        return
@@ -230,34 +313,45 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
-                       log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err)
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        continue
                }
 
+               if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+                       now := time.Now()
+                       d.processResult.StartTime = &now
+               }
+
+               messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+               if responseCommand != nil {
+                       _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
+               }
        }
 }
 
-func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) {
+func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
 
        defer listenAndHandleWg.Done()
 
-       if repeatCount == 0 {
-               d.listenAndHandleNoRepeat(ctx)
+       if command.RepeatCount == 0 {
+               d.listenAndHandleNoRepeat(ctx, command)
                return
        }
 
-       d.listenAndHandleWithRepeat(ctx, repeatCount)
+       d.listenAndHandleWithRepeat(ctx, command)
 }
 
 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {