6 "github.com/pkg/errors"
18 var configuration = make(map[string]*models.JsonCommand)
19 // Rmr Message Id -> Command
20 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
22 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
23 rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
26 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
29 waitForRmrMessageType[int(rmrMsgId)] = &command
33 type Dispatcher struct {
34 rmrService *rmr.Service
35 processResult models.ProcessResult
38 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
39 return d.processResult
42 func New(rmrService *rmr.Service) *Dispatcher {
44 rmrService: rmrService,
48 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
50 return errors.New(fmt.Sprintf("invalid cmd, no id"))
52 configuration[cmd.Id] = &cmd
55 // if len(cmd.ReceiveCommandId) == 0 {
59 // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
62 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
63 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
66 log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
67 d.processResult.Err = err
68 d.processResult.Stats.SentErrorCount++
72 d.processResult.Stats.SentCount++
76 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
77 for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
85 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
88 log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
89 d.processResult.Stats.SentErrorCount++
93 d.processResult.Stats.SentCount++
94 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
98 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
99 command, ok := configuration[receiveCommandId]
102 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
105 if len(command.RmrMessageType) == 0 {
106 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
109 return command.RmrMessageType, nil
112 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
114 defer sendAndReceiveWg.Done()
115 var listenAndHandleWg sync.WaitGroup
117 if len(command.ReceiveCommandId) > 0 {
118 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
121 d.processResult.Err = err
125 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
128 d.processResult.Err = err
132 listenAndHandleWg.Add(1)
133 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
136 if command.RepeatCount == 0 {
137 err := d.sendNoRepeat(command)
144 d.sendWithRepeat(ctx, command)
147 if len(command.ReceiveCommandId) > 0 {
148 listenAndHandleWg.Wait()
152 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
154 defer sendAndReceiveWg.Done()
156 err := addRmrMessageToWaitFor(command.RmrMessageType, command)
159 d.processResult.Err = err
163 var listenAndHandleWg sync.WaitGroup
164 listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
165 d.listenAndHandle(ctx, &listenAndHandleWg, command)
168 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
169 var command models.JsonCommand
170 if len(cmd.Id) == 0 {
171 return command, errors.New(fmt.Sprintf("invalid command, no id"))
176 conf, ok := configuration[cmd.Id]
180 mergeConfigurationAndCommand(&command, cmd)
186 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
188 command, err := getMergedCommand(cmd)
191 d.processResult.Err = err
195 var sendAndReceiveWg sync.WaitGroup
197 commandAction := enums.CommandAction(command.Action)
199 switch commandAction {
201 case enums.SendRmrMessage:
202 sendAndReceiveWg.Add(1)
203 go d.sendHandler(ctx, &sendAndReceiveWg, command)
204 case enums.ReceiveRmrMessage:
205 sendAndReceiveWg.Add(1)
206 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
208 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
212 sendAndReceiveWg.Wait()
215 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
223 mbuf, err := d.rmrService.RecvMessage()
226 d.processResult.Err = err
227 d.processResult.Stats.ReceivedErrorCount++
231 _, ok := waitForRmrMessageType[mbuf.MType]
234 log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
235 d.processResult.Stats.ReceivedUnexpectedCount++
239 log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
240 d.processResult.Stats.ReceivedExpectedCount++
242 if len(command.SendCommandId) > 0 {
243 responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
244 _ = d.sendNoRepeat(*responseCommand)
251 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
252 for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
259 mbuf, err := d.rmrService.RecvMessage()
262 log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
263 d.processResult.Stats.ReceivedErrorCount++
267 _, ok := waitForRmrMessageType[mbuf.MType]
270 log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
271 d.processResult.Stats.ReceivedUnexpectedCount++
275 log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
276 d.processResult.Stats.ReceivedExpectedCount++
278 if len(command.SendCommandId) > 0 {
279 responseCommand := configuration[command.SendCommandId]
280 _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
285 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
287 defer listenAndHandleWg.Done()
289 if command.RepeatCount == 0 {
290 d.listenAndHandleNoRepeat(ctx, command)
294 d.listenAndHandleWithRepeat(ctx, command)
297 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
298 nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
300 for i := 0; i < nFields; i++ {
301 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
302 switch fieldValue.Kind() {
304 if fieldValue.Len() > 0 {
305 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
307 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
308 if fieldValue.Int() != 0 {
309 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
312 if fieldValue.Bool() {
313 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
315 case reflect.Float64, reflect.Float32:
316 if fieldValue.Float() != 0 {
317 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
320 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)