[RICPLT-1528] xApp Mock dev.....
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
1 package dispatcher
2
3 import (
4         "context"
5         "fmt"
6         "github.com/pkg/errors"
7         "log"
8         "reflect"
9         "sync"
10         "time"
11         "xappmock/enums"
12         "xappmock/models"
13         "xappmock/rmr"
14         "xappmock/sender"
15 )
16
17 // Id -> Command
18 var configuration = make(map[string]*models.JsonCommand)
19 // Rmr Message Id -> Command
20 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
21
22 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
23         rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
24
25         if err != nil {
26                 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
27         }
28
29         waitForRmrMessageType[int(rmrMsgId)] = &command
30         return nil
31 }
32
33 type Dispatcher struct {
34         rmrService              *rmr.Service
35         processResult           models.ProcessResult
36 }
37
38 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
39         return d.processResult
40 }
41
42 func New(rmrService *rmr.Service) *Dispatcher {
43         return &Dispatcher{
44                 rmrService:              rmrService,
45         }
46 }
47
48 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
49         if len(cmd.Id) == 0 {
50                 return errors.New(fmt.Sprintf("invalid cmd, no id"))
51         }
52         configuration[cmd.Id] = &cmd
53         return nil
54
55         //      if len(cmd.ReceiveCommandId) == 0 {
56         //              return nil
57         //      }
58         //
59         //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
60 }
61
62 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
63         err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
64
65         if err != nil {
66                 log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
67                 d.processResult.Err = err
68                 d.processResult.Stats.SentErrorCount++
69                 return err
70         }
71
72         d.processResult.Stats.SentCount++
73         return nil
74 }
75
76 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
77         for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
78
79                 select {
80                 case <-ctx.Done():
81                         return
82                 default:
83                 }
84
85                 err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
86
87                 if err != nil {
88                         log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
89                         d.processResult.Stats.SentErrorCount++
90                         continue
91                 }
92
93                 d.processResult.Stats.SentCount++
94                 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
95         }
96 }
97
98 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
99         command, ok := configuration[receiveCommandId]
100
101         if !ok {
102                 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
103         }
104
105         if len(command.RmrMessageType) == 0 {
106                 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
107         }
108
109         return command.RmrMessageType, nil
110 }
111
112 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
113
114         defer sendAndReceiveWg.Done()
115         var listenAndHandleWg sync.WaitGroup
116
117         if len(command.ReceiveCommandId) > 0 {
118                 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
119
120                 if err != nil {
121                         d.processResult.Err = err
122                         return
123                 }
124
125                 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
126
127                 if err != nil {
128                         d.processResult.Err = err
129                         return
130                 }
131
132                 listenAndHandleWg.Add(1)
133                 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
134         }
135
136         if command.RepeatCount == 0 {
137                 err := d.sendNoRepeat(command)
138
139                 if err != nil {
140                         return
141                 }
142
143         } else {
144                 d.sendWithRepeat(ctx, command)
145         }
146
147         if len(command.ReceiveCommandId) > 0 {
148                 listenAndHandleWg.Wait()
149         }
150 }
151
152 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
153
154         defer sendAndReceiveWg.Done()
155
156         err := addRmrMessageToWaitFor(command.RmrMessageType, command)
157
158         if err != nil {
159                 d.processResult.Err = err
160                 return
161         }
162
163         var listenAndHandleWg sync.WaitGroup
164         listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
165         d.listenAndHandle(ctx, &listenAndHandleWg, command)
166 }
167
168 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
169         var command models.JsonCommand
170         if len(cmd.Id) == 0 {
171                 return command, errors.New(fmt.Sprintf("invalid command, no id"))
172         }
173
174         command = *cmd
175
176         conf, ok := configuration[cmd.Id]
177
178         if ok {
179                 command = *conf
180                 mergeConfigurationAndCommand(&command, cmd)
181         }
182
183         return command, nil
184 }
185
186 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
187
188         command, err := getMergedCommand(cmd)
189
190         if err != nil {
191                 d.processResult.Err = err
192                 return
193         }
194
195         var sendAndReceiveWg sync.WaitGroup
196
197         commandAction := enums.CommandAction(command.Action)
198
199         switch commandAction {
200
201         case enums.SendRmrMessage:
202                 sendAndReceiveWg.Add(1)
203                 go d.sendHandler(ctx, &sendAndReceiveWg, command)
204         case enums.ReceiveRmrMessage:
205                 sendAndReceiveWg.Add(1)
206                 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
207         default:
208                 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
209                 return
210         }
211
212         sendAndReceiveWg.Wait()
213 }
214
215 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
216         for {
217                 select {
218                 case <-ctx.Done():
219                         return
220                 default:
221                 }
222
223                 mbuf, err := d.rmrService.RecvMessage()
224
225                 if err != nil {
226                         d.processResult.Err = err
227                         d.processResult.Stats.ReceivedErrorCount++
228                         return
229                 }
230
231                 _, ok := waitForRmrMessageType[mbuf.MType]
232
233                 if !ok {
234                         log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
235                         d.processResult.Stats.ReceivedUnexpectedCount++
236                         continue
237                 }
238
239                 log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
240                 d.processResult.Stats.ReceivedExpectedCount++
241
242                 if len(command.SendCommandId) > 0 {
243                         responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
244                         _ = d.sendNoRepeat(*responseCommand)
245                 }
246
247                 return
248         }
249 }
250
251 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
252         for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
253                 select {
254                 case <-ctx.Done():
255                         return
256                 default:
257                 }
258
259                 mbuf, err := d.rmrService.RecvMessage()
260
261                 if err != nil {
262                         log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
263                         d.processResult.Stats.ReceivedErrorCount++
264                         continue
265                 }
266
267                 _, ok := waitForRmrMessageType[mbuf.MType]
268
269                 if !ok {
270                         log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
271                         d.processResult.Stats.ReceivedUnexpectedCount++
272                         continue
273                 }
274
275                 log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
276                 d.processResult.Stats.ReceivedExpectedCount++
277
278                 if len(command.SendCommandId) > 0 {
279                         responseCommand := configuration[command.SendCommandId]
280                         _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
281                 }
282         }
283 }
284
285 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
286
287         defer listenAndHandleWg.Done()
288
289         if command.RepeatCount == 0 {
290                 d.listenAndHandleNoRepeat(ctx, command)
291                 return
292         }
293
294         d.listenAndHandleWithRepeat(ctx, command)
295 }
296
297 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
298         nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
299
300         for i := 0; i < nFields; i++ {
301                 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
302                         switch fieldValue.Kind() {
303                         case reflect.String:
304                                 if fieldValue.Len() > 0 {
305                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
306                                 }
307                         case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
308                                 if fieldValue.Int() != 0 {
309                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
310                                 }
311                         case reflect.Bool:
312                                 if fieldValue.Bool() {
313                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
314                                 }
315                         case reflect.Float64, reflect.Float32:
316                                 if fieldValue.Float() != 0 {
317                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
318                                 }
319                         default:
320                                 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
321                         }
322                 }
323         }
324 }