6 "github.com/pkg/errors"
18 var configuration = make(map[string]*models.JsonCommand)
20 // Rmr Message Id -> Command
21 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
23 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
24 rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
27 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
30 waitForRmrMessageType[int(rmrMsgId)] = &command
34 type Dispatcher struct {
35 rmrService *rmr.Service
36 processResult models.ProcessResult
38 jsonSender *sender.JsonSender
41 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
42 return d.processResult
45 func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
47 rmrService: rmrService,
49 jsonSender: jsonSender,
53 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
55 return errors.New(fmt.Sprintf("invalid cmd, no id"))
57 configuration[cmd.Id] = &cmd
60 // if len(cmd.ReceiveCommandId) == 0 {
64 // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
67 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
69 if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
71 d.processResult.StartTime = &now
74 err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
77 d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
78 d.processResult.Err = err
79 d.processResult.Stats.SentErrorCount.Inc()
83 d.processResult.Stats.SentCount.Inc()
87 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
89 if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
91 d.processResult.StartTime = &now
94 for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
102 err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
105 d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
106 d.processResult.Stats.SentErrorCount.Inc()
110 d.processResult.Stats.SentCount.Inc()
111 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
115 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
116 command, ok := configuration[receiveCommandId]
119 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
122 if len(command.RmrMessageType) == 0 {
123 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
126 return command.RmrMessageType, nil
129 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
131 defer sendAndReceiveWg.Done()
132 var listenAndHandleWg sync.WaitGroup
134 if len(command.ReceiveCommandId) > 0 {
135 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
138 d.processResult.Err = err
142 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
145 d.processResult.Err = err
149 listenAndHandleWg.Add(1)
150 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
153 if command.RepeatCount == 0 {
154 err := d.sendNoRepeat(command)
161 d.sendWithRepeat(ctx, command)
164 if len(command.ReceiveCommandId) > 0 {
165 listenAndHandleWg.Wait()
169 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
171 defer sendAndReceiveWg.Done()
173 err := addRmrMessageToWaitFor(command.RmrMessageType, command)
176 d.processResult.Err = err
180 var listenAndHandleWg sync.WaitGroup
181 listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
182 d.listenAndHandle(ctx, &listenAndHandleWg, command)
185 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
186 var command models.JsonCommand
187 if len(cmd.Id) == 0 {
188 return command, errors.New(fmt.Sprintf("invalid command, no id"))
193 conf, ok := configuration[cmd.Id]
197 mergeConfigurationAndCommand(&command, cmd)
203 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
205 command, err := getMergedCommand(cmd)
208 d.processResult.Err = err
212 var sendAndReceiveWg sync.WaitGroup
214 commandAction := enums.CommandAction(command.Action)
216 switch commandAction {
218 case enums.SendRmrMessage:
219 sendAndReceiveWg.Add(1)
220 go d.sendHandler(ctx, &sendAndReceiveWg, command)
221 case enums.ReceiveRmrMessage:
222 sendAndReceiveWg.Add(1)
223 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
225 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
229 sendAndReceiveWg.Wait()
232 func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
233 responseCommand, ok := configuration[command.SendCommandId]
236 return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
239 return responseCommand, nil
242 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
250 mbuf, err := d.rmrService.RecvMessage()
253 d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
254 d.processResult.Err = err
255 d.processResult.Stats.ReceivedErrorCount.Inc()
259 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
261 d.processResult.StartTime = &now
264 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
266 _, ok := waitForRmrMessageType[mbuf.MType]
269 d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
270 d.processResult.Stats.ReceivedUnexpectedCount.Inc()
274 d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
275 d.processResult.Stats.ReceivedExpectedCount.Inc()
277 if len(command.SendCommandId) > 0 {
278 responseCommand, err := getResponseCommand(command)
281 d.processResult.Err = err
285 _ = d.sendNoRepeat(*responseCommand)
292 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
294 var responseCommand *models.JsonCommand
296 if len(command.SendCommandId) > 0 {
298 responseCommand, err = getResponseCommand(command)
301 d.processResult.Err = err
306 for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
313 mbuf, err := d.rmrService.RecvMessage()
316 d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
317 d.processResult.Stats.ReceivedErrorCount.Inc()
321 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
323 d.processResult.StartTime = &now
326 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
328 _, ok := waitForRmrMessageType[mbuf.MType]
331 d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
332 d.processResult.Stats.ReceivedUnexpectedCount.Inc()
336 d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
337 d.processResult.Stats.ReceivedExpectedCount.Inc()
339 if responseCommand != nil {
340 _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
345 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
347 defer listenAndHandleWg.Done()
349 if command.RepeatCount == 0 {
350 d.listenAndHandleNoRepeat(ctx, command)
354 d.listenAndHandleWithRepeat(ctx, command)
357 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
358 nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
360 for i := 0; i < nFields; i++ {
361 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
362 switch fieldValue.Kind() {
364 if fieldValue.Len() > 0 {
365 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
367 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
368 if fieldValue.Int() != 0 {
369 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
372 if fieldValue.Bool() {
373 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
375 case reflect.Float64, reflect.Float32:
376 if fieldValue.Float() != 0 {
377 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
380 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)