Copy latest code
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
diff --git a/tools/xappmock/dispatcher/dispatcher.go b/tools/xappmock/dispatcher/dispatcher.go
new file mode 100644 (file)
index 0000000..6e00592
--- /dev/null
@@ -0,0 +1,384 @@
+package dispatcher
+
+import (
+       "context"
+       "fmt"
+       "github.com/pkg/errors"
+       "reflect"
+       "sync"
+       "time"
+       "xappmock/enums"
+       "xappmock/logger"
+       "xappmock/models"
+       "xappmock/rmr"
+       "xappmock/sender"
+)
+
+// Id -> Command
+var configuration = make(map[string]*models.JsonCommand)
+
+// Rmr Message Id -> Command
+var waitForRmrMessageType = make(map[int]*models.JsonCommand)
+
+func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
+       rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
+
+       if err != nil {
+               return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
+       }
+
+       waitForRmrMessageType[int(rmrMsgId)] = &command
+       return nil
+}
+
+type Dispatcher struct {
+       rmrService    *rmr.Service
+       processResult models.ProcessResult
+       logger        *logger.Logger
+       jsonSender    *sender.JsonSender
+}
+
+func (d *Dispatcher) GetProcessResult() models.ProcessResult {
+       return d.processResult
+}
+
+func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
+       return &Dispatcher{
+               rmrService: rmrService,
+               logger:     logger,
+               jsonSender: jsonSender,
+       }
+}
+
+func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
+       if len(cmd.Id) == 0 {
+               return errors.New(fmt.Sprintf("invalid cmd, no id"))
+       }
+       configuration[cmd.Id] = &cmd
+       return nil
+
+       //      if len(cmd.ReceiveCommandId) == 0 {
+       //              return nil
+       //      }
+       //
+       //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
+}
+
+func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
+
+       if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+               now := time.Now()
+               d.processResult.StartTime = &now
+       }
+
+       err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
+
+       if err != nil {
+               d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
+               d.processResult.Err = err
+               d.processResult.Stats.SentErrorCount.Inc()
+               return err
+       }
+
+       d.processResult.Stats.SentCount.Inc()
+       return nil
+}
+
+func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+       if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+               now := time.Now()
+               d.processResult.StartTime = &now
+       }
+
+       for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
+
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+
+               err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
+
+               if err != nil {
+                       d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
+                       d.processResult.Stats.SentErrorCount.Inc()
+                       continue
+               }
+
+               d.processResult.Stats.SentCount.Inc()
+               time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
+       }
+}
+
+func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
+       command, ok := configuration[receiveCommandId]
+
+       if !ok {
+               return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
+       }
+
+       if len(command.RmrMessageType) == 0 {
+               return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
+       }
+
+       return command.RmrMessageType, nil
+}
+
+func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
+
+       defer sendAndReceiveWg.Done()
+       var listenAndHandleWg sync.WaitGroup
+
+       if len(command.ReceiveCommandId) > 0 {
+               rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+
+               err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+
+               listenAndHandleWg.Add(1)
+               go d.listenAndHandle(ctx, &listenAndHandleWg, command)
+       }
+
+       if command.RepeatCount == 0 {
+               err := d.sendNoRepeat(command)
+
+               if err != nil {
+                       return
+               }
+
+       } else {
+               d.sendWithRepeat(ctx, command)
+       }
+
+       if len(command.ReceiveCommandId) > 0 {
+               listenAndHandleWg.Wait()
+       }
+}
+
+func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
+
+       defer sendAndReceiveWg.Done()
+
+       err := addRmrMessageToWaitFor(command.RmrMessageType, command)
+
+       if err != nil {
+               d.processResult.Err = err
+               return
+       }
+
+       var listenAndHandleWg sync.WaitGroup
+       listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
+       d.listenAndHandle(ctx, &listenAndHandleWg, command)
+}
+
+func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
+       var command models.JsonCommand
+       if len(cmd.Id) == 0 {
+               return command, errors.New(fmt.Sprintf("invalid command, no id"))
+       }
+
+       command = *cmd
+
+       conf, ok := configuration[cmd.Id]
+
+       if ok {
+               command = *conf
+               mergeConfigurationAndCommand(&command, cmd)
+       }
+
+       return command, nil
+}
+
+func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
+
+       command, err := getMergedCommand(cmd)
+
+       if err != nil {
+               d.processResult.Err = err
+               return
+       }
+
+       var sendAndReceiveWg sync.WaitGroup
+
+       commandAction := enums.CommandAction(command.Action)
+
+       switch commandAction {
+
+       case enums.SendRmrMessage:
+               sendAndReceiveWg.Add(1)
+               go d.sendHandler(ctx, &sendAndReceiveWg, command)
+       case enums.ReceiveRmrMessage:
+               sendAndReceiveWg.Add(1)
+               go d.receiveHandler(ctx, &sendAndReceiveWg, command)
+       default:
+               d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
+               return
+       }
+
+       sendAndReceiveWg.Wait()
+}
+
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+       responseCommand, ok := configuration[command.SendCommandId]
+
+       if !ok {
+               return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+       }
+
+       return responseCommand, nil
+}
+
+func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+
+               mbuf, err := d.rmrService.RecvMessage()
+
+               if err != nil {
+                       d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
+                       d.processResult.Err = err
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
+                       return
+               }
+
+               if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+                       now := time.Now()
+                       d.processResult.StartTime = &now
+               }
+
+               messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
+               _, ok := waitForRmrMessageType[mbuf.MType]
+
+               if !ok {
+                       d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
+                       continue
+               }
+
+               d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+               if len(command.SendCommandId) > 0 {
+                       responseCommand, err := getResponseCommand(command)
+
+                       if err != nil {
+                               d.processResult.Err = err
+                               return
+                       }
+
+                       _ = d.sendNoRepeat(*responseCommand)
+               }
+
+               return
+       }
+}
+
+func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+       var responseCommand *models.JsonCommand
+
+       if len(command.SendCommandId) > 0 {
+               var err error
+               responseCommand, err = getResponseCommand(command)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+       }
+
+       for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+
+               mbuf, err := d.rmrService.RecvMessage()
+
+               if err != nil {
+                       d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
+                       continue
+               }
+
+               if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+                       now := time.Now()
+                       d.processResult.StartTime = &now
+               }
+
+               messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
+               _, ok := waitForRmrMessageType[mbuf.MType]
+
+               if !ok {
+                       d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
+                       continue
+               }
+
+               d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
+
+               if responseCommand != nil {
+                       _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
+               }
+       }
+}
+
+func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
+
+       defer listenAndHandleWg.Done()
+
+       if command.RepeatCount == 0 {
+               d.listenAndHandleNoRepeat(ctx, command)
+               return
+       }
+
+       d.listenAndHandleWithRepeat(ctx, command)
+}
+
+func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
+       nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
+
+       for i := 0; i < nFields; i++ {
+               if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
+                       switch fieldValue.Kind() {
+                       case reflect.String:
+                               if fieldValue.Len() > 0 {
+                                       reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+                               }
+                       case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+                               if fieldValue.Int() != 0 {
+                                       reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+                               }
+                       case reflect.Bool:
+                               if fieldValue.Bool() {
+                                       reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+                               }
+                       case reflect.Float64, reflect.Float32:
+                               if fieldValue.Float() != 0 {
+                                       reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+                               }
+                       default:
+                               reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
+                       }
+               }
+       }
+}