Merge "Copy latest code"
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
1 package dispatcher
2
3 import (
4         "context"
5         "fmt"
6         "github.com/pkg/errors"
7         "reflect"
8         "sync"
9         "time"
10         "xappmock/enums"
11         "xappmock/logger"
12         "xappmock/models"
13         "xappmock/rmr"
14         "xappmock/sender"
15 )
16
17 // Id -> Command
18 var configuration = make(map[string]*models.JsonCommand)
19
20 // Rmr Message Id -> Command
21 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
22
23 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
24         rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
25
26         if err != nil {
27                 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
28         }
29
30         waitForRmrMessageType[int(rmrMsgId)] = &command
31         return nil
32 }
33
34 type Dispatcher struct {
35         rmrService    *rmr.Service
36         processResult models.ProcessResult
37         logger        *logger.Logger
38         jsonSender    *sender.JsonSender
39 }
40
41 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
42         return d.processResult
43 }
44
45 func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
46         return &Dispatcher{
47                 rmrService: rmrService,
48                 logger:     logger,
49                 jsonSender: jsonSender,
50         }
51 }
52
53 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
54         if len(cmd.Id) == 0 {
55                 return errors.New(fmt.Sprintf("invalid cmd, no id"))
56         }
57         configuration[cmd.Id] = &cmd
58         return nil
59
60         //      if len(cmd.ReceiveCommandId) == 0 {
61         //              return nil
62         //      }
63         //
64         //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
65 }
66
67 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
68
69         if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
70                 now := time.Now()
71                 d.processResult.StartTime = &now
72         }
73
74         err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
75
76         if err != nil {
77                 d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
78                 d.processResult.Err = err
79                 d.processResult.Stats.SentErrorCount.Inc()
80                 return err
81         }
82
83         d.processResult.Stats.SentCount.Inc()
84         return nil
85 }
86
87 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
88
89         if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
90                 now := time.Now()
91                 d.processResult.StartTime = &now
92         }
93
94         for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
95
96                 select {
97                 case <-ctx.Done():
98                         return
99                 default:
100                 }
101
102                 err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
103
104                 if err != nil {
105                         d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
106                         d.processResult.Stats.SentErrorCount.Inc()
107                         continue
108                 }
109
110                 d.processResult.Stats.SentCount.Inc()
111                 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
112         }
113 }
114
115 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
116         command, ok := configuration[receiveCommandId]
117
118         if !ok {
119                 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
120         }
121
122         if len(command.RmrMessageType) == 0 {
123                 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
124         }
125
126         return command.RmrMessageType, nil
127 }
128
129 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
130
131         defer sendAndReceiveWg.Done()
132         var listenAndHandleWg sync.WaitGroup
133
134         if len(command.ReceiveCommandId) > 0 {
135                 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
136
137                 if err != nil {
138                         d.processResult.Err = err
139                         return
140                 }
141
142                 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
143
144                 if err != nil {
145                         d.processResult.Err = err
146                         return
147                 }
148
149                 listenAndHandleWg.Add(1)
150                 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
151         }
152
153         if command.RepeatCount == 0 {
154                 err := d.sendNoRepeat(command)
155
156                 if err != nil {
157                         return
158                 }
159
160         } else {
161                 d.sendWithRepeat(ctx, command)
162         }
163
164         if len(command.ReceiveCommandId) > 0 {
165                 listenAndHandleWg.Wait()
166         }
167 }
168
169 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
170
171         defer sendAndReceiveWg.Done()
172
173         err := addRmrMessageToWaitFor(command.RmrMessageType, command)
174
175         if err != nil {
176                 d.processResult.Err = err
177                 return
178         }
179
180         var listenAndHandleWg sync.WaitGroup
181         listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
182         d.listenAndHandle(ctx, &listenAndHandleWg, command)
183 }
184
185 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
186         var command models.JsonCommand
187         if len(cmd.Id) == 0 {
188                 return command, errors.New(fmt.Sprintf("invalid command, no id"))
189         }
190
191         command = *cmd
192
193         conf, ok := configuration[cmd.Id]
194
195         if ok {
196                 command = *conf
197                 mergeConfigurationAndCommand(&command, cmd)
198         }
199
200         return command, nil
201 }
202
203 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
204
205         command, err := getMergedCommand(cmd)
206
207         if err != nil {
208                 d.processResult.Err = err
209                 return
210         }
211
212         var sendAndReceiveWg sync.WaitGroup
213
214         commandAction := enums.CommandAction(command.Action)
215
216         switch commandAction {
217
218         case enums.SendRmrMessage:
219                 sendAndReceiveWg.Add(1)
220                 go d.sendHandler(ctx, &sendAndReceiveWg, command)
221         case enums.ReceiveRmrMessage:
222                 sendAndReceiveWg.Add(1)
223                 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
224         default:
225                 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
226                 return
227         }
228
229         sendAndReceiveWg.Wait()
230 }
231
232 func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
233         responseCommand, ok := configuration[command.SendCommandId]
234
235         if !ok {
236                 return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
237         }
238
239         return responseCommand, nil
240 }
241
242 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
243         for {
244                 select {
245                 case <-ctx.Done():
246                         return
247                 default:
248                 }
249
250                 mbuf, err := d.rmrService.RecvMessage()
251
252                 if err != nil {
253                         d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
254                         d.processResult.Err = err
255                         d.processResult.Stats.ReceivedErrorCount.Inc()
256                         return
257                 }
258
259                 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
260                         now := time.Now()
261                         d.processResult.StartTime = &now
262                 }
263
264                 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
265
266                 _, ok := waitForRmrMessageType[mbuf.MType]
267
268                 if !ok {
269                         d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
270                         d.processResult.Stats.ReceivedUnexpectedCount.Inc()
271                         continue
272                 }
273
274                 d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
275                 d.processResult.Stats.ReceivedExpectedCount.Inc()
276
277                 if len(command.SendCommandId) > 0 {
278                         responseCommand, err := getResponseCommand(command)
279
280                         if err != nil {
281                                 d.processResult.Err = err
282                                 return
283                         }
284
285                         _ = d.sendNoRepeat(*responseCommand)
286                 }
287
288                 return
289         }
290 }
291
292 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
293
294         var responseCommand *models.JsonCommand
295
296         if len(command.SendCommandId) > 0 {
297                 var err error
298                 responseCommand, err = getResponseCommand(command)
299
300                 if err != nil {
301                         d.processResult.Err = err
302                         return
303                 }
304         }
305
306         for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
307                 select {
308                 case <-ctx.Done():
309                         return
310                 default:
311                 }
312
313                 mbuf, err := d.rmrService.RecvMessage()
314
315                 if err != nil {
316                         d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
317                         d.processResult.Stats.ReceivedErrorCount.Inc()
318                         continue
319                 }
320
321                 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
322                         now := time.Now()
323                         d.processResult.StartTime = &now
324                 }
325
326                 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
327
328                 _, ok := waitForRmrMessageType[mbuf.MType]
329
330                 if !ok {
331                         d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
332                         d.processResult.Stats.ReceivedUnexpectedCount.Inc()
333                         continue
334                 }
335
336                 d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
337                 d.processResult.Stats.ReceivedExpectedCount.Inc()
338
339                 if responseCommand != nil {
340                         _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
341                 }
342         }
343 }
344
345 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
346
347         defer listenAndHandleWg.Done()
348
349         if command.RepeatCount == 0 {
350                 d.listenAndHandleNoRepeat(ctx, command)
351                 return
352         }
353
354         d.listenAndHandleWithRepeat(ctx, command)
355 }
356
357 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
358         nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
359
360         for i := 0; i < nFields; i++ {
361                 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
362                         switch fieldValue.Kind() {
363                         case reflect.String:
364                                 if fieldValue.Len() > 0 {
365                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
366                                 }
367                         case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
368                                 if fieldValue.Int() != 0 {
369                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
370                                 }
371                         case reflect.Bool:
372                                 if fieldValue.Bool() {
373                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
374                                 }
375                         case reflect.Float64, reflect.Float32:
376                                 if fieldValue.Float() != 0 {
377                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
378                                 }
379                         default:
380                                 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
381                         }
382                 }
383         }
384 }