X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=tools%2Fxappmock%2Fdispatcher%2Fdispatcher.go;h=6e00592bccfe182f136bb8fbd74911a71171c81f;hb=3da846d52012fe6fc60228c5e94928d3588eae6b;hp=7243cdea815554d1577d4ec624729ebeb21f1520;hpb=bd8558c49e0d7189c90ea837a1ee4e0ca41b8dd1;p=ric-plt%2Fe2mgr.git diff --git a/tools/xappmock/dispatcher/dispatcher.go b/tools/xappmock/dispatcher/dispatcher.go index 7243cde..6e00592 100644 --- a/tools/xappmock/dispatcher/dispatcher.go +++ b/tools/xappmock/dispatcher/dispatcher.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "github.com/pkg/errors" - "log" "reflect" "sync" "time" "xappmock/enums" + "xappmock/logger" "xappmock/models" "xappmock/rmr" "xappmock/sender" @@ -16,6 +16,7 @@ import ( // Id -> Command var configuration = make(map[string]*models.JsonCommand) + // Rmr Message Id -> Command var waitForRmrMessageType = make(map[int]*models.JsonCommand) @@ -31,17 +32,21 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma } type Dispatcher struct { - rmrService *rmr.Service - processResult models.ProcessResult + rmrService *rmr.Service + processResult models.ProcessResult + logger *logger.Logger + jsonSender *sender.JsonSender } func (d *Dispatcher) GetProcessResult() models.ProcessResult { return d.processResult } -func New(rmrService *rmr.Service) *Dispatcher { +func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher { return &Dispatcher{ - rmrService: rmrService, + rmrService: rmrService, + logger: logger, + jsonSender: jsonSender, } } @@ -60,20 +65,32 @@ func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error { } func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error { - err := sender.SendJsonRmrMessage(command, nil, d.rmrService) + + if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + + err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService) if err != nil { - log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err) + d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err) d.processResult.Err = err - d.processResult.Stats.SentErrorCount++ + d.processResult.Stats.SentErrorCount.Inc() return err } - d.processResult.Stats.SentCount++ + d.processResult.Stats.SentCount.Inc() return nil } func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) { + + if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- { select { @@ -82,15 +99,15 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm default: } - err := sender.SendJsonRmrMessage(command, nil, d.rmrService) + err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService) if err != nil { - log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err) - d.processResult.Stats.SentErrorCount++ + d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err) + d.processResult.Stats.SentErrorCount.Inc() continue } - d.processResult.Stats.SentCount++ + d.processResult.Stats.SentCount.Inc() time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond) } } @@ -212,6 +229,16 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom sendAndReceiveWg.Wait() } +func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) { + responseCommand, ok := configuration[command.SendCommandId] + + if !ok { + return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId)) + } + + return responseCommand, nil +} + func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) { for { select { @@ -223,24 +250,38 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models mbuf, err := d.rmrService.RecvMessage() if err != nil { + d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err) d.processResult.Err = err - d.processResult.Stats.ReceivedErrorCount++ + d.processResult.Stats.ReceivedErrorCount.Inc() return } + if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + + messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction) + _, ok := waitForRmrMessageType[mbuf.MType] if !ok { - log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf) - d.processResult.Stats.ReceivedUnexpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo) + d.processResult.Stats.ReceivedUnexpectedCount.Inc() continue } - log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf) - d.processResult.Stats.ReceivedExpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo) + d.processResult.Stats.ReceivedExpectedCount.Inc() if len(command.SendCommandId) > 0 { - responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand + responseCommand, err := getResponseCommand(command) + + if err != nil { + d.processResult.Err = err + return + } + _ = d.sendNoRepeat(*responseCommand) } @@ -249,7 +290,20 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models } func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) { - for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount { + + var responseCommand *models.JsonCommand + + if len(command.SendCommandId) > 0 { + var err error + responseCommand, err = getResponseCommand(command) + + if err != nil { + d.processResult.Err = err + return + } + } + + for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) { select { case <-ctx.Done(): return @@ -259,24 +313,30 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command mode mbuf, err := d.rmrService.RecvMessage() if err != nil { - log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err) - d.processResult.Stats.ReceivedErrorCount++ + d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err) + d.processResult.Stats.ReceivedErrorCount.Inc() continue } + if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil { + now := time.Now() + d.processResult.StartTime = &now + } + + messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction) + _, ok := waitForRmrMessageType[mbuf.MType] if !ok { - log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf) - d.processResult.Stats.ReceivedUnexpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo) + d.processResult.Stats.ReceivedUnexpectedCount.Inc() continue } - log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf) - d.processResult.Stats.ReceivedExpectedCount++ + d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo) + d.processResult.Stats.ReceivedExpectedCount.Inc() - if len(command.SendCommandId) > 0 { - responseCommand := configuration[command.SendCommandId] + if responseCommand != nil { _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling } }