sync R3 content from Azure
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
1 package dispatcher
2
3 import (
4         "context"
5         "fmt"
6         "github.com/pkg/errors"
7         "log"
8         "reflect"
9         "sync"
10         "time"
11         "xappmock/enums"
12         "xappmock/models"
13         "xappmock/rmr"
14         "xappmock/sender"
15 )
16
17 // Id -> Command
18 var configuration = make(map[string]*models.JsonCommand)
19 // Rmr Message Id -> Command
20 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
21
22 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
23         rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
24
25         if err != nil {
26                 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
27         }
28
29         waitForRmrMessageType[int(rmrMsgId)] = &command
30         return nil
31 }
32
33 type Dispatcher struct {
34         rmrService    *rmr.Service
35         processResult models.ProcessResult
36 }
37
38 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
39         return d.processResult
40 }
41
42 func New(rmrService *rmr.Service) *Dispatcher {
43         return &Dispatcher{
44                 rmrService: rmrService,
45         }
46 }
47
48 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
49         if len(cmd.Id) == 0 {
50                 return errors.New(fmt.Sprintf("invalid cmd, no id"))
51         }
52         configuration[cmd.Id] = &cmd
53
54         if len(cmd.ReceiveRmrMessageType) == 0 {
55                 return nil
56         }
57
58         return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd)
59 }
60
61 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) {
62         err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
63
64         if err != nil {
65                 log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
66                 d.processResult.Err = err
67                 d.processResult.Stats.SentErrorCount++
68                 return
69         }
70
71         d.processResult.Stats.SentCount++
72
73 }
74
75 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
76         for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
77
78                 select {
79                 case <-ctx.Done():
80                         return
81                 default:
82                 }
83
84                 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
85
86                 if err != nil {
87                         log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
88                         d.processResult.Stats.SentErrorCount++
89                         continue
90                 }
91
92                 d.processResult.Stats.SentCount++
93                 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
94         }
95 }
96
97 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
98
99         defer sendAndReceiveWg.Done()
100         var listenAndHandleWg sync.WaitGroup
101
102         if len(command.ReceiveRmrMessageType) > 0 {
103                 err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
104
105                 if err != nil {
106                         d.processResult.Err = err
107                         return
108                 }
109
110                 listenAndHandleWg.Add(1)
111                 go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
112         }
113
114         if command.RepeatCount == 0 {
115                 d.sendNoRepeat(command)
116         } else {
117                 d.sendWithRepeat(ctx, command)
118         }
119
120         if len(command.ReceiveRmrMessageType) > 0 {
121                 listenAndHandleWg.Wait()
122         }
123 }
124
125 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
126
127         defer sendAndReceiveWg.Done()
128
129         err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
130
131         if err != nil {
132                 d.processResult.Err = err
133                 return
134         }
135
136         var listenAndHandleWg sync.WaitGroup
137         listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
138         d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
139 }
140
141 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
142         var command models.JsonCommand
143         if len(cmd.Id) == 0 {
144                 return command, errors.New(fmt.Sprintf("invalid command, no id"))
145         }
146
147         command = *cmd
148
149         conf, ok := configuration[cmd.Id]
150
151         if ok {
152                 command = *conf
153                 mergeConfigurationAndCommand(&command, cmd)
154         }
155
156         return command, nil
157 }
158
159 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
160
161         command, err := getMergedCommand(cmd)
162
163         if err != nil {
164                 d.processResult.Err = err
165                 return
166         }
167
168         var sendAndReceiveWg sync.WaitGroup
169
170         commandAction := enums.CommandAction(command.Action)
171
172         switch commandAction {
173
174         case enums.SendRmrMessage:
175                 sendAndReceiveWg.Add(1)
176                 go d.sendHandler(ctx, &sendAndReceiveWg, command)
177         case enums.ReceiveRmrMessage:
178                 sendAndReceiveWg.Add(1)
179                 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
180         default:
181                 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
182                 return
183         }
184
185         sendAndReceiveWg.Wait()
186 }
187
188 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
189         for {
190                 select {
191                 case <-ctx.Done():
192                         return
193                 default:
194                 }
195
196                 mbuf, err := d.rmrService.RecvMessage()
197
198                 if err != nil {
199                         d.processResult.Err = err
200                         d.processResult.Stats.ReceivedErrorCount++
201                         return
202                 }
203
204                 _, ok := waitForRmrMessageType[mbuf.MType]
205
206                 if !ok {
207                         log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
208                         d.processResult.Stats.ReceivedUnexpectedCount++
209                         continue
210                 }
211
212                 log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
213                 d.processResult.Stats.ReceivedExpectedCount++
214                 return
215         }
216 }
217
218 func (d *Dispatcher) receive(ctx context.Context) {
219
220 }
221
222 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) {
223         for d.processResult.Stats.ReceivedExpectedCount < repeatCount {
224                 select {
225                 case <-ctx.Done():
226                         return
227                 default:
228                 }
229
230                 mbuf, err := d.rmrService.RecvMessage()
231
232                 if err != nil {
233                         log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err)
234                         d.processResult.Stats.ReceivedErrorCount++
235                         continue
236                 }
237
238                 _, ok := waitForRmrMessageType[mbuf.MType]
239
240                 if !ok {
241                         log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
242                         d.processResult.Stats.ReceivedUnexpectedCount++
243                         continue
244                 }
245
246                 log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
247                 d.processResult.Stats.ReceivedExpectedCount++
248         }
249 }
250
251 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) {
252
253         defer listenAndHandleWg.Done()
254
255         if repeatCount == 0 {
256                 d.listenAndHandleNoRepeat(ctx)
257                 return
258         }
259
260         d.listenAndHandleWithRepeat(ctx, repeatCount)
261 }
262
263 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
264         nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
265
266         for i := 0; i < nFields; i++ {
267                 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
268                         switch fieldValue.Kind() {
269                         case reflect.String:
270                                 if fieldValue.Len() > 0 {
271                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
272                                 }
273                         case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
274                                 if fieldValue.Int() != 0 {
275                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
276                                 }
277                         case reflect.Bool:
278                                 if fieldValue.Bool() {
279                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
280                                 }
281                         case reflect.Float64, reflect.Float32:
282                                 if fieldValue.Float() != 0 {
283                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
284                                 }
285                         default:
286                                 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
287                         }
288                 }
289         }
290 }