X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=tools%2Fxappmock%2Fdispatcher%2Fdispatcher.go;h=6e00592bccfe182f136bb8fbd74911a71171c81f;hb=3da846d52012fe6fc60228c5e94928d3588eae6b;hp=1bf65f1e9da1a8fd8a6059f49b17819f2c076bbf;hpb=bcb124908ffd1de0c00868838bbac733b881fcb2;p=ric-plt%2Fe2mgr.git diff --git a/tools/xappmock/dispatcher/dispatcher.go b/tools/xappmock/dispatcher/dispatcher.go index 1bf65f1..6e00592 100644 --- a/tools/xappmock/dispatcher/dispatcher.go +++ b/tools/xappmock/dispatcher/dispatcher.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "github.com/pkg/errors" - "log" "reflect" "sync" "time" "xappmock/enums" + "xappmock/logger" "xappmock/models" "xappmock/rmr" "xappmock/sender" @@ -16,6 +16,7 @@ import ( // Id -> Command var configuration = make(map[string]*models.JsonCommand) + // Rmr Message Id -> Command var waitForRmrMessageType = make(map[int]*models.JsonCommand) @@ -33,15 +34,19 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma type Dispatcher struct { rmrService *rmr.Service processResult models.ProcessResult + logger *logger.Logger + jsonSender *sender.JsonSender } func (d *Dispatcher) GetProcessResult() models.ProcessResult { return d.processResult } -func New(rmrService *rmr.Service) *Dispatcher { +func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher { return &Dispatcher{ rmrService: rmrService, + logger: logger, + jsonSender: jsonSender, } } @@ -50,29 +55,42 @@ func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error { return errors.New(fmt.Sprintf("invalid cmd, no id")) } configuration[cmd.Id] = &cmd + return nil - if len(cmd.ReceiveRmrMessageType) == 0 { - return nil - } - - return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd) + // if len(cmd.ReceiveCommandId) == 0 { + // return nil + // } + // + // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd) } -func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) { - err := sender.SendJsonRmrMessage(command, nil, d.rmrService) +func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error { + + if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + + err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService) if err != nil { - log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err) + d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err) d.processResult.Err = err - d.processResult.Stats.SentErrorCount++ - return + d.processResult.Stats.SentErrorCount.Inc() + return err } - d.processResult.Stats.SentCount++ - + d.processResult.Stats.SentCount.Inc() + return nil } func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) { + + if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- { select { @@ -81,26 +99,47 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm default: } - err := sender.SendJsonRmrMessage(command, nil, d.rmrService) + err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService) if err != nil { - log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err) - d.processResult.Stats.SentErrorCount++ + d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err) + d.processResult.Stats.SentErrorCount.Inc() continue } - d.processResult.Stats.SentCount++ + d.processResult.Stats.SentCount.Inc() time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond) } } +func getReceiveRmrMessageType(receiveCommandId string) (string, error) { + command, ok := configuration[receiveCommandId] + + if !ok { + return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId)) + } + + if len(command.RmrMessageType) == 0 { + return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId)) + } + + return command.RmrMessageType, nil +} + func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) { defer sendAndReceiveWg.Done() var listenAndHandleWg sync.WaitGroup - if len(command.ReceiveRmrMessageType) > 0 { - err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command) + if len(command.ReceiveCommandId) > 0 { + rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId) + + if err != nil { + d.processResult.Err = err + return + } + + err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command) if err != nil { d.processResult.Err = err @@ -108,16 +147,21 @@ func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.Wai } listenAndHandleWg.Add(1) - go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount) + go d.listenAndHandle(ctx, &listenAndHandleWg, command) } if command.RepeatCount == 0 { - d.sendNoRepeat(command) + err := d.sendNoRepeat(command) + + if err != nil { + return + } + } else { d.sendWithRepeat(ctx, command) } - if len(command.ReceiveRmrMessageType) > 0 { + if len(command.ReceiveCommandId) > 0 { listenAndHandleWg.Wait() } } @@ -126,7 +170,7 @@ func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync. defer sendAndReceiveWg.Done() - err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command) + err := addRmrMessageToWaitFor(command.RmrMessageType, command) if err != nil { d.processResult.Err = err @@ -135,7 +179,7 @@ func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync. var listenAndHandleWg sync.WaitGroup listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case - d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount) + d.listenAndHandle(ctx, &listenAndHandleWg, command) } func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) { @@ -185,7 +229,17 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom sendAndReceiveWg.Wait() } -func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) { +func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) { + responseCommand, ok := configuration[command.SendCommandId] + + if !ok { + return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId)) + } + + return responseCommand, nil +} + +func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) { for { select { case <-ctx.Done(): @@ -196,31 +250,60 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) { mbuf, err := d.rmrService.RecvMessage() if err != nil { + d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err) d.processResult.Err = err - d.processResult.Stats.ReceivedErrorCount++ + d.processResult.Stats.ReceivedErrorCount.Inc() return } + if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + + messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction) + _, ok := waitForRmrMessageType[mbuf.MType] if !ok { - log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf) - d.processResult.Stats.ReceivedUnexpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo) + d.processResult.Stats.ReceivedUnexpectedCount.Inc() continue } - log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf) - d.processResult.Stats.ReceivedExpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo) + d.processResult.Stats.ReceivedExpectedCount.Inc() + + if len(command.SendCommandId) > 0 { + responseCommand, err := getResponseCommand(command) + + if err != nil { + d.processResult.Err = err + return + } + + _ = d.sendNoRepeat(*responseCommand) + } + return } } -func (d *Dispatcher) receive(ctx context.Context) { +func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) { -} + var responseCommand *models.JsonCommand + + if len(command.SendCommandId) > 0 { + var err error + responseCommand, err = getResponseCommand(command) + + if err != nil { + d.processResult.Err = err + return + } + } -func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) { - for d.processResult.Stats.ReceivedExpectedCount < repeatCount { + for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) { select { case <-ctx.Done(): return @@ -230,34 +313,45 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount mbuf, err := d.rmrService.RecvMessage() if err != nil { - log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err) - d.processResult.Stats.ReceivedErrorCount++ + d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err) + d.processResult.Stats.ReceivedErrorCount.Inc() continue } + if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + + messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction) + _, ok := waitForRmrMessageType[mbuf.MType] if !ok { - log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf) - d.processResult.Stats.ReceivedUnexpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo) + d.processResult.Stats.ReceivedUnexpectedCount.Inc() continue } - log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf) - d.processResult.Stats.ReceivedExpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo) + d.processResult.Stats.ReceivedExpectedCount.Inc() + + if responseCommand != nil { + _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling + } } } -func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) { +func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) { defer listenAndHandleWg.Done() - if repeatCount == 0 { - d.listenAndHandleNoRepeat(ctx) + if command.RepeatCount == 0 { + d.listenAndHandleNoRepeat(ctx, command) return } - d.listenAndHandleWithRepeat(ctx, repeatCount) + d.listenAndHandleWithRepeat(ctx, command) } func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {