6 "github.com/pkg/errors"
18 var configuration = make(map[string]*models.JsonCommand)
20 // Rmr Message Id -> Command
21 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
23 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
24 rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
27 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
30 waitForRmrMessageType[int(rmrMsgId)] = &command
34 type Dispatcher struct {
35 rmrService *rmr.Service
36 processResult models.ProcessResult
39 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
40 return d.processResult
43 func New(rmrService *rmr.Service) *Dispatcher {
45 rmrService: rmrService,
49 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
51 return errors.New(fmt.Sprintf("invalid cmd, no id"))
53 configuration[cmd.Id] = &cmd
56 // if len(cmd.ReceiveCommandId) == 0 {
60 // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
63 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
64 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
67 log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
68 d.processResult.Err = err
69 d.processResult.Stats.SentErrorCount.Inc()
73 d.processResult.Stats.SentCount.Inc()
77 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
78 for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
86 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
89 log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
90 d.processResult.Stats.SentErrorCount.Inc()
94 d.processResult.Stats.SentCount.Inc()
95 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
99 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
100 command, ok := configuration[receiveCommandId]
103 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
106 if len(command.RmrMessageType) == 0 {
107 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
110 return command.RmrMessageType, nil
113 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
115 defer sendAndReceiveWg.Done()
116 var listenAndHandleWg sync.WaitGroup
118 if len(command.ReceiveCommandId) > 0 {
119 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
122 d.processResult.Err = err
126 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
129 d.processResult.Err = err
133 listenAndHandleWg.Add(1)
134 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
137 if command.RepeatCount == 0 {
138 err := d.sendNoRepeat(command)
145 d.sendWithRepeat(ctx, command)
148 if len(command.ReceiveCommandId) > 0 {
149 listenAndHandleWg.Wait()
153 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
155 defer sendAndReceiveWg.Done()
157 err := addRmrMessageToWaitFor(command.RmrMessageType, command)
160 d.processResult.Err = err
164 var listenAndHandleWg sync.WaitGroup
165 listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
166 d.listenAndHandle(ctx, &listenAndHandleWg, command)
169 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
170 var command models.JsonCommand
171 if len(cmd.Id) == 0 {
172 return command, errors.New(fmt.Sprintf("invalid command, no id"))
177 conf, ok := configuration[cmd.Id]
181 mergeConfigurationAndCommand(&command, cmd)
187 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
189 command, err := getMergedCommand(cmd)
192 d.processResult.Err = err
196 var sendAndReceiveWg sync.WaitGroup
198 commandAction := enums.CommandAction(command.Action)
200 switch commandAction {
202 case enums.SendRmrMessage:
203 sendAndReceiveWg.Add(1)
204 go d.sendHandler(ctx, &sendAndReceiveWg, command)
205 case enums.ReceiveRmrMessage:
206 sendAndReceiveWg.Add(1)
207 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
209 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
213 sendAndReceiveWg.Wait()
216 func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
217 responseCommand, ok := configuration[command.SendCommandId]
220 return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
223 return responseCommand, nil
226 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
234 mbuf, err := d.rmrService.RecvMessage()
237 log.Printf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
238 d.processResult.Err = err
239 d.processResult.Stats.ReceivedErrorCount.Inc()
243 messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
245 _, ok := waitForRmrMessageType[mbuf.MType]
248 log.Printf("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
249 d.processResult.Stats.ReceivedUnexpectedCount.Inc()
253 log.Printf("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
254 d.processResult.Stats.ReceivedExpectedCount.Inc()
256 if len(command.SendCommandId) > 0 {
257 responseCommand, err := getResponseCommand(command)
260 d.processResult.Err = err
264 _ = d.sendNoRepeat(*responseCommand)
271 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
273 var responseCommand *models.JsonCommand
275 if len(command.SendCommandId) > 0 {
277 responseCommand, err = getResponseCommand(command)
280 d.processResult.Err = err
285 for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
292 mbuf, err := d.rmrService.RecvMessage()
295 log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
296 d.processResult.Stats.ReceivedErrorCount.Inc()
300 messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
302 _, ok := waitForRmrMessageType[mbuf.MType]
305 log.Printf("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
306 d.processResult.Stats.ReceivedUnexpectedCount.Inc()
310 log.Printf("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
311 d.processResult.Stats.ReceivedExpectedCount.Inc()
313 if responseCommand != nil {
314 _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
319 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
321 defer listenAndHandleWg.Done()
323 if command.RepeatCount == 0 {
324 d.listenAndHandleNoRepeat(ctx, command)
328 d.listenAndHandleWithRepeat(ctx, command)
331 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
332 nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
334 for i := 0; i < nFields; i++ {
335 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
336 switch fieldValue.Kind() {
338 if fieldValue.Len() > 0 {
339 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
341 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
342 if fieldValue.Int() != 0 {
343 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
346 if fieldValue.Bool() {
347 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
349 case reflect.Float64, reflect.Float32:
350 if fieldValue.Float() != 0 {
351 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
354 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)