[RICPLT-2585] Init fixes | Init UTs..........
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
1 package dispatcher
2
3 import (
4         "context"
5         "fmt"
6         "github.com/pkg/errors"
7         "log"
8         "reflect"
9         "sync"
10         "time"
11         "xappmock/enums"
12         "xappmock/models"
13         "xappmock/rmr"
14         "xappmock/sender"
15 )
16
17 // Id -> Command
18 var configuration = make(map[string]*models.JsonCommand)
19
20 // Rmr Message Id -> Command
21 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
22
23 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
24         rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
25
26         if err != nil {
27                 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
28         }
29
30         waitForRmrMessageType[int(rmrMsgId)] = &command
31         return nil
32 }
33
34 type Dispatcher struct {
35         rmrService    *rmr.Service
36         processResult models.ProcessResult
37 }
38
39 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
40         return d.processResult
41 }
42
43 func New(rmrService *rmr.Service) *Dispatcher {
44         return &Dispatcher{
45                 rmrService: rmrService,
46         }
47 }
48
49 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
50         if len(cmd.Id) == 0 {
51                 return errors.New(fmt.Sprintf("invalid cmd, no id"))
52         }
53         configuration[cmd.Id] = &cmd
54         return nil
55
56         //      if len(cmd.ReceiveCommandId) == 0 {
57         //              return nil
58         //      }
59         //
60         //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
61 }
62
63 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
64         err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
65
66         if err != nil {
67                 log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
68                 d.processResult.Err = err
69                 d.processResult.Stats.SentErrorCount.Inc()
70                 return err
71         }
72
73         d.processResult.Stats.SentCount.Inc()
74         return nil
75 }
76
77 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
78         for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
79
80                 select {
81                 case <-ctx.Done():
82                         return
83                 default:
84                 }
85
86                 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
87
88                 if err != nil {
89                         log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
90                         d.processResult.Stats.SentErrorCount.Inc()
91                         continue
92                 }
93
94                 d.processResult.Stats.SentCount.Inc()
95                 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
96         }
97 }
98
99 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
100         command, ok := configuration[receiveCommandId]
101
102         if !ok {
103                 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
104         }
105
106         if len(command.RmrMessageType) == 0 {
107                 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
108         }
109
110         return command.RmrMessageType, nil
111 }
112
113 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
114
115         defer sendAndReceiveWg.Done()
116         var listenAndHandleWg sync.WaitGroup
117
118         if len(command.ReceiveCommandId) > 0 {
119                 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
120
121                 if err != nil {
122                         d.processResult.Err = err
123                         return
124                 }
125
126                 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
127
128                 if err != nil {
129                         d.processResult.Err = err
130                         return
131                 }
132
133                 listenAndHandleWg.Add(1)
134                 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
135         }
136
137         if command.RepeatCount == 0 {
138                 err := d.sendNoRepeat(command)
139
140                 if err != nil {
141                         return
142                 }
143
144         } else {
145                 d.sendWithRepeat(ctx, command)
146         }
147
148         if len(command.ReceiveCommandId) > 0 {
149                 listenAndHandleWg.Wait()
150         }
151 }
152
153 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
154
155         defer sendAndReceiveWg.Done()
156
157         err := addRmrMessageToWaitFor(command.RmrMessageType, command)
158
159         if err != nil {
160                 d.processResult.Err = err
161                 return
162         }
163
164         var listenAndHandleWg sync.WaitGroup
165         listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
166         d.listenAndHandle(ctx, &listenAndHandleWg, command)
167 }
168
169 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
170         var command models.JsonCommand
171         if len(cmd.Id) == 0 {
172                 return command, errors.New(fmt.Sprintf("invalid command, no id"))
173         }
174
175         command = *cmd
176
177         conf, ok := configuration[cmd.Id]
178
179         if ok {
180                 command = *conf
181                 mergeConfigurationAndCommand(&command, cmd)
182         }
183
184         return command, nil
185 }
186
187 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
188
189         command, err := getMergedCommand(cmd)
190
191         if err != nil {
192                 d.processResult.Err = err
193                 return
194         }
195
196         var sendAndReceiveWg sync.WaitGroup
197
198         commandAction := enums.CommandAction(command.Action)
199
200         switch commandAction {
201
202         case enums.SendRmrMessage:
203                 sendAndReceiveWg.Add(1)
204                 go d.sendHandler(ctx, &sendAndReceiveWg, command)
205         case enums.ReceiveRmrMessage:
206                 sendAndReceiveWg.Add(1)
207                 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
208         default:
209                 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
210                 return
211         }
212
213         sendAndReceiveWg.Wait()
214 }
215
216 func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
217         responseCommand, ok := configuration[command.SendCommandId]
218
219         if !ok {
220                 return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
221         }
222
223         return responseCommand, nil
224 }
225
226 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
227         for {
228                 select {
229                 case <-ctx.Done():
230                         return
231                 default:
232                 }
233
234                 mbuf, err := d.rmrService.RecvMessage()
235
236                 if err != nil {
237                         log.Printf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
238                         d.processResult.Err = err
239                         d.processResult.Stats.ReceivedErrorCount.Inc()
240                         return
241                 }
242
243                 messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
244
245                 _, ok := waitForRmrMessageType[mbuf.MType]
246
247                 if !ok {
248                         log.Printf("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
249                         d.processResult.Stats.ReceivedUnexpectedCount.Inc()
250                         continue
251                 }
252
253                 log.Printf("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
254                 d.processResult.Stats.ReceivedExpectedCount.Inc()
255
256                 if len(command.SendCommandId) > 0 {
257                         responseCommand, err := getResponseCommand(command)
258
259                         if err != nil {
260                                 d.processResult.Err = err
261                                 return
262                         }
263
264                         _ = d.sendNoRepeat(*responseCommand)
265                 }
266
267                 return
268         }
269 }
270
271 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
272
273         var responseCommand *models.JsonCommand
274
275         if len(command.SendCommandId) > 0 {
276                 var err error
277                 responseCommand, err = getResponseCommand(command)
278
279                 if err != nil {
280                         d.processResult.Err = err
281                         return
282                 }
283         }
284
285         for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
286                 select {
287                 case <-ctx.Done():
288                         return
289                 default:
290                 }
291
292                 mbuf, err := d.rmrService.RecvMessage()
293
294                 if err != nil {
295                         log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
296                         d.processResult.Stats.ReceivedErrorCount.Inc()
297                         continue
298                 }
299
300                 messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
301
302                 _, ok := waitForRmrMessageType[mbuf.MType]
303
304                 if !ok {
305                         log.Printf("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
306                         d.processResult.Stats.ReceivedUnexpectedCount.Inc()
307                         continue
308                 }
309
310                 log.Printf("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
311                 d.processResult.Stats.ReceivedExpectedCount.Inc()
312
313                 if responseCommand != nil {
314                         _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
315                 }
316         }
317 }
318
319 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
320
321         defer listenAndHandleWg.Done()
322
323         if command.RepeatCount == 0 {
324                 d.listenAndHandleNoRepeat(ctx, command)
325                 return
326         }
327
328         d.listenAndHandleWithRepeat(ctx, command)
329 }
330
331 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
332         nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
333
334         for i := 0; i < nFields; i++ {
335                 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
336                         switch fieldValue.Kind() {
337                         case reflect.String:
338                                 if fieldValue.Len() > 0 {
339                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
340                                 }
341                         case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
342                                 if fieldValue.Int() != 0 {
343                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
344                                 }
345                         case reflect.Bool:
346                                 if fieldValue.Bool() {
347                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
348                                 }
349                         case reflect.Float64, reflect.Float32:
350                                 if fieldValue.Float() != 0 {
351                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
352                                 }
353                         default:
354                                 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
355                         }
356                 }
357         }
358 }