--- /dev/null
+package dispatcher
+
+import (
+ "context"
+ "fmt"
+ "github.com/pkg/errors"
+ "reflect"
+ "sync"
+ "time"
+ "xappmock/enums"
+ "xappmock/logger"
+ "xappmock/models"
+ "xappmock/rmr"
+ "xappmock/sender"
+)
+
+// Id -> Command
+var configuration = make(map[string]*models.JsonCommand)
+
+// Rmr Message Id -> Command
+var waitForRmrMessageType = make(map[int]*models.JsonCommand)
+
+func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
+ rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
+
+ if err != nil {
+ return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
+ }
+
+ waitForRmrMessageType[int(rmrMsgId)] = &command
+ return nil
+}
+
+type Dispatcher struct {
+ rmrService *rmr.Service
+ processResult models.ProcessResult
+ logger *logger.Logger
+ jsonSender *sender.JsonSender
+}
+
+func (d *Dispatcher) GetProcessResult() models.ProcessResult {
+ return d.processResult
+}
+
+func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
+ return &Dispatcher{
+ rmrService: rmrService,
+ logger: logger,
+ jsonSender: jsonSender,
+ }
+}
+
+func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
+ if len(cmd.Id) == 0 {
+ return errors.New(fmt.Sprintf("invalid cmd, no id"))
+ }
+ configuration[cmd.Id] = &cmd
+ return nil
+
+ // if len(cmd.ReceiveCommandId) == 0 {
+ // return nil
+ // }
+ //
+ // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
+}
+
+func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
+
+ if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
+
+ if err != nil {
+ d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
+ d.processResult.Err = err
+ d.processResult.Stats.SentErrorCount.Inc()
+ return err
+ }
+
+ d.processResult.Stats.SentCount.Inc()
+ return nil
+}
+
+func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+ if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
+
+ if err != nil {
+ d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
+ d.processResult.Stats.SentErrorCount.Inc()
+ continue
+ }
+
+ d.processResult.Stats.SentCount.Inc()
+ time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
+ }
+}
+
+func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
+ command, ok := configuration[receiveCommandId]
+
+ if !ok {
+ return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
+ }
+
+ if len(command.RmrMessageType) == 0 {
+ return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
+ }
+
+ return command.RmrMessageType, nil
+}
+
+func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
+
+ defer sendAndReceiveWg.Done()
+ var listenAndHandleWg sync.WaitGroup
+
+ if len(command.ReceiveCommandId) > 0 {
+ rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ listenAndHandleWg.Add(1)
+ go d.listenAndHandle(ctx, &listenAndHandleWg, command)
+ }
+
+ if command.RepeatCount == 0 {
+ err := d.sendNoRepeat(command)
+
+ if err != nil {
+ return
+ }
+
+ } else {
+ d.sendWithRepeat(ctx, command)
+ }
+
+ if len(command.ReceiveCommandId) > 0 {
+ listenAndHandleWg.Wait()
+ }
+}
+
+func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
+
+ defer sendAndReceiveWg.Done()
+
+ err := addRmrMessageToWaitFor(command.RmrMessageType, command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ var listenAndHandleWg sync.WaitGroup
+ listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
+ d.listenAndHandle(ctx, &listenAndHandleWg, command)
+}
+
+func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
+ var command models.JsonCommand
+ if len(cmd.Id) == 0 {
+ return command, errors.New(fmt.Sprintf("invalid command, no id"))
+ }
+
+ command = *cmd
+
+ conf, ok := configuration[cmd.Id]
+
+ if ok {
+ command = *conf
+ mergeConfigurationAndCommand(&command, cmd)
+ }
+
+ return command, nil
+}
+
+func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
+
+ command, err := getMergedCommand(cmd)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ var sendAndReceiveWg sync.WaitGroup
+
+ commandAction := enums.CommandAction(command.Action)
+
+ switch commandAction {
+
+ case enums.SendRmrMessage:
+ sendAndReceiveWg.Add(1)
+ go d.sendHandler(ctx, &sendAndReceiveWg, command)
+ case enums.ReceiveRmrMessage:
+ sendAndReceiveWg.Add(1)
+ go d.receiveHandler(ctx, &sendAndReceiveWg, command)
+ default:
+ d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
+ return
+ }
+
+ sendAndReceiveWg.Wait()
+}
+
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+ responseCommand, ok := configuration[command.SendCommandId]
+
+ if !ok {
+ return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+ }
+
+ return responseCommand, nil
+}
+
+func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ mbuf, err := d.rmrService.RecvMessage()
+
+ if err != nil {
+ d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
+ d.processResult.Err = err
+ d.processResult.Stats.ReceivedErrorCount.Inc()
+ return
+ }
+
+ if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
+ _, ok := waitForRmrMessageType[mbuf.MType]
+
+ if !ok {
+ d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedUnexpectedCount.Inc()
+ continue
+ }
+
+ d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+ if len(command.SendCommandId) > 0 {
+ responseCommand, err := getResponseCommand(command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ _ = d.sendNoRepeat(*responseCommand)
+ }
+
+ return
+ }
+}
+
+func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+ var responseCommand *models.JsonCommand
+
+ if len(command.SendCommandId) > 0 {
+ var err error
+ responseCommand, err = getResponseCommand(command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+ }
+
+ for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ mbuf, err := d.rmrService.RecvMessage()
+
+ if err != nil {
+ d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
+ d.processResult.Stats.ReceivedErrorCount.Inc()
+ continue
+ }
+
+ if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
+ _, ok := waitForRmrMessageType[mbuf.MType]
+
+ if !ok {
+ d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedUnexpectedCount.Inc()
+ continue
+ }
+
+ d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+ if responseCommand != nil {
+ _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
+ }
+ }
+}
+
+func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
+
+ defer listenAndHandleWg.Done()
+
+ if command.RepeatCount == 0 {
+ d.listenAndHandleNoRepeat(ctx, command)
+ return
+ }
+
+ d.listenAndHandleWithRepeat(ctx, command)
+}
+
+func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
+ nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
+
+ for i := 0; i < nFields; i++ {
+ if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
+ switch fieldValue.Kind() {
+ case reflect.String:
+ if fieldValue.Len() > 0 {
+ reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+ }
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+ if fieldValue.Int() != 0 {
+ reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+ }
+ case reflect.Bool:
+ if fieldValue.Bool() {
+ reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+ }
+ case reflect.Float64, reflect.Float32:
+ if fieldValue.Float() != 0 {
+ reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+ }
+ default:
+ reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+ }
+ }
+ }
+}