6 "github.com/pkg/errors"
18 var configuration = make(map[string]*models.JsonCommand)
19 // Rmr Message Id -> Command
20 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
22 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
23 rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
26 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
29 waitForRmrMessageType[int(rmrMsgId)] = &command
33 type Dispatcher struct {
34 rmrService *rmr.Service
35 processResult models.ProcessResult
38 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
39 return d.processResult
42 func New(rmrService *rmr.Service) *Dispatcher {
44 rmrService: rmrService,
48 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
50 return errors.New(fmt.Sprintf("invalid cmd, no id"))
52 configuration[cmd.Id] = &cmd
54 if len(cmd.ReceiveRmrMessageType) == 0 {
58 return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd)
61 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) {
62 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
65 log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
66 d.processResult.Err = err
67 d.processResult.Stats.SentErrorCount++
71 d.processResult.Stats.SentCount++
75 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
76 for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
84 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
87 log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
88 d.processResult.Stats.SentErrorCount++
92 d.processResult.Stats.SentCount++
93 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
97 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
99 defer sendAndReceiveWg.Done()
100 var listenAndHandleWg sync.WaitGroup
102 if len(command.ReceiveRmrMessageType) > 0 {
103 err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
106 d.processResult.Err = err
110 listenAndHandleWg.Add(1)
111 go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
114 if command.RepeatCount == 0 {
115 d.sendNoRepeat(command)
117 d.sendWithRepeat(ctx, command)
120 if len(command.ReceiveRmrMessageType) > 0 {
121 listenAndHandleWg.Wait()
125 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
127 defer sendAndReceiveWg.Done()
129 err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
132 d.processResult.Err = err
136 var listenAndHandleWg sync.WaitGroup
137 listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
138 d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
141 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
142 var command models.JsonCommand
143 if len(cmd.Id) == 0 {
144 return command, errors.New(fmt.Sprintf("invalid command, no id"))
149 conf, ok := configuration[cmd.Id]
153 mergeConfigurationAndCommand(&command, cmd)
159 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
161 command, err := getMergedCommand(cmd)
164 d.processResult.Err = err
168 var sendAndReceiveWg sync.WaitGroup
170 commandAction := enums.CommandAction(command.Action)
172 switch commandAction {
174 case enums.SendRmrMessage:
175 sendAndReceiveWg.Add(1)
176 go d.sendHandler(ctx, &sendAndReceiveWg, command)
177 case enums.ReceiveRmrMessage:
178 sendAndReceiveWg.Add(1)
179 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
181 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
185 sendAndReceiveWg.Wait()
188 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
196 mbuf, err := d.rmrService.RecvMessage()
199 d.processResult.Err = err
200 d.processResult.Stats.ReceivedErrorCount++
204 _, ok := waitForRmrMessageType[mbuf.MType]
207 log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
208 d.processResult.Stats.ReceivedUnexpectedCount++
212 log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
213 d.processResult.Stats.ReceivedExpectedCount++
218 func (d *Dispatcher) receive(ctx context.Context) {
222 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) {
223 for d.processResult.Stats.ReceivedExpectedCount < repeatCount {
230 mbuf, err := d.rmrService.RecvMessage()
233 log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err)
234 d.processResult.Stats.ReceivedErrorCount++
238 _, ok := waitForRmrMessageType[mbuf.MType]
241 log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
242 d.processResult.Stats.ReceivedUnexpectedCount++
246 log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
247 d.processResult.Stats.ReceivedExpectedCount++
251 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) {
253 defer listenAndHandleWg.Done()
255 if repeatCount == 0 {
256 d.listenAndHandleNoRepeat(ctx)
260 d.listenAndHandleWithRepeat(ctx, repeatCount)
263 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
264 nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
266 for i := 0; i < nFields; i++ {
267 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
268 switch fieldValue.Kind() {
270 if fieldValue.Len() > 0 {
271 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
273 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
274 if fieldValue.Int() != 0 {
275 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
278 if fieldValue.Bool() {
279 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
281 case reflect.Float64, reflect.Float32:
282 if fieldValue.Float() != 0 {
283 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
286 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)