"context"
"fmt"
"github.com/pkg/errors"
- "log"
"reflect"
"sync"
"time"
"xappmock/enums"
+ "xappmock/logger"
"xappmock/models"
"xappmock/rmr"
"xappmock/sender"
// Id -> Command
var configuration = make(map[string]*models.JsonCommand)
+
// Rmr Message Id -> Command
var waitForRmrMessageType = make(map[int]*models.JsonCommand)
type Dispatcher struct {
rmrService *rmr.Service
processResult models.ProcessResult
+ logger *logger.Logger
+ jsonSender *sender.JsonSender
}
func (d *Dispatcher) GetProcessResult() models.ProcessResult {
return d.processResult
}
-func New(rmrService *rmr.Service) *Dispatcher {
+func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
return &Dispatcher{
rmrService: rmrService,
+ logger: logger,
+ jsonSender: jsonSender,
}
}
return errors.New(fmt.Sprintf("invalid cmd, no id"))
}
configuration[cmd.Id] = &cmd
+ return nil
- if len(cmd.ReceiveRmrMessageType) == 0 {
- return nil
- }
-
- return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd)
+ // if len(cmd.ReceiveCommandId) == 0 {
+ // return nil
+ // }
+ //
+ // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
}
-func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) {
- err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
+
+ if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
if err != nil {
- log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
+ d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
d.processResult.Err = err
- d.processResult.Stats.SentErrorCount++
- return
+ d.processResult.Stats.SentErrorCount.Inc()
+ return err
}
- d.processResult.Stats.SentCount++
-
+ d.processResult.Stats.SentCount.Inc()
+ return nil
}
func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+ if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
select {
default:
}
- err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+ err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
if err != nil {
- log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
- d.processResult.Stats.SentErrorCount++
+ d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
+ d.processResult.Stats.SentErrorCount.Inc()
continue
}
- d.processResult.Stats.SentCount++
+ d.processResult.Stats.SentCount.Inc()
time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
}
}
+func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
+ command, ok := configuration[receiveCommandId]
+
+ if !ok {
+ return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
+ }
+
+ if len(command.RmrMessageType) == 0 {
+ return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
+ }
+
+ return command.RmrMessageType, nil
+}
+
func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
defer sendAndReceiveWg.Done()
var listenAndHandleWg sync.WaitGroup
- if len(command.ReceiveRmrMessageType) > 0 {
- err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+ if len(command.ReceiveCommandId) > 0 {
+ rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
if err != nil {
d.processResult.Err = err
}
listenAndHandleWg.Add(1)
- go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+ go d.listenAndHandle(ctx, &listenAndHandleWg, command)
}
if command.RepeatCount == 0 {
- d.sendNoRepeat(command)
+ err := d.sendNoRepeat(command)
+
+ if err != nil {
+ return
+ }
+
} else {
d.sendWithRepeat(ctx, command)
}
- if len(command.ReceiveRmrMessageType) > 0 {
+ if len(command.ReceiveCommandId) > 0 {
listenAndHandleWg.Wait()
}
}
defer sendAndReceiveWg.Done()
- err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+ err := addRmrMessageToWaitFor(command.RmrMessageType, command)
if err != nil {
d.processResult.Err = err
var listenAndHandleWg sync.WaitGroup
listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
- d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+ d.listenAndHandle(ctx, &listenAndHandleWg, command)
}
func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
sendAndReceiveWg.Wait()
}
-func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+ responseCommand, ok := configuration[command.SendCommandId]
+
+ if !ok {
+ return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+ }
+
+ return responseCommand, nil
+}
+
+func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
for {
select {
case <-ctx.Done():
mbuf, err := d.rmrService.RecvMessage()
if err != nil {
+ d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
d.processResult.Err = err
- d.processResult.Stats.ReceivedErrorCount++
+ d.processResult.Stats.ReceivedErrorCount.Inc()
return
}
+ if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
- d.processResult.Stats.ReceivedUnexpectedCount++
+ d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedUnexpectedCount.Inc()
continue
}
- log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
- d.processResult.Stats.ReceivedExpectedCount++
+ d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+ if len(command.SendCommandId) > 0 {
+ responseCommand, err := getResponseCommand(command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ _ = d.sendNoRepeat(*responseCommand)
+ }
+
return
}
}
-func (d *Dispatcher) receive(ctx context.Context) {
+func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
-}
+ var responseCommand *models.JsonCommand
+
+ if len(command.SendCommandId) > 0 {
+ var err error
+ responseCommand, err = getResponseCommand(command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+ }
-func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) {
- for d.processResult.Stats.ReceivedExpectedCount < repeatCount {
+ for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
select {
case <-ctx.Done():
return
mbuf, err := d.rmrService.RecvMessage()
if err != nil {
- log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err)
- d.processResult.Stats.ReceivedErrorCount++
+ d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
+ d.processResult.Stats.ReceivedErrorCount.Inc()
continue
}
+ if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
- d.processResult.Stats.ReceivedUnexpectedCount++
+ d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedUnexpectedCount.Inc()
continue
}
- log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
- d.processResult.Stats.ReceivedExpectedCount++
+ d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+ if responseCommand != nil {
+ _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
+ }
}
}
-func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) {
+func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
defer listenAndHandleWg.Done()
- if repeatCount == 0 {
- d.listenAndHandleNoRepeat(ctx)
+ if command.RepeatCount == 0 {
+ d.listenAndHandleNoRepeat(ctx, command)
return
}
- d.listenAndHandleWithRepeat(ctx, repeatCount)
+ d.listenAndHandleWithRepeat(ctx, command)
}
func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {