Merge "Changing status to connected state after timeout."
[ric-plt/e2mgr.git] / tools / xappmock / dispatcher / dispatcher.go
1 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
2 //  platform project (RICP).
3
4 // Copyright 2019 Nokia
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 //      http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 //
18
19 //  This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 //  platform project (RICP).
21
22 package dispatcher
23
24 import (
25         "context"
26         "fmt"
27         "github.com/pkg/errors"
28         "reflect"
29         "sync"
30         "time"
31         "xappmock/enums"
32         "xappmock/logger"
33         "xappmock/models"
34         "xappmock/rmr"
35         "xappmock/sender"
36 )
37
38 // Id -> Command
39 var configuration = make(map[string]*models.JsonCommand)
40
41 // Rmr Message Id -> Command
42 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
43
44 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
45         rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
46
47         if err != nil {
48                 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
49         }
50
51         waitForRmrMessageType[int(rmrMsgId)] = &command
52         return nil
53 }
54
55 type Dispatcher struct {
56         rmrService    *rmr.Service
57         processResult models.ProcessResult
58         logger        *logger.Logger
59         jsonSender    *sender.JsonSender
60 }
61
62 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
63         return d.processResult
64 }
65
66 func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
67         return &Dispatcher{
68                 rmrService: rmrService,
69                 logger:     logger,
70                 jsonSender: jsonSender,
71         }
72 }
73
74 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
75         if len(cmd.Id) == 0 {
76                 return errors.New(fmt.Sprintf("invalid cmd, no id"))
77         }
78         configuration[cmd.Id] = &cmd
79         return nil
80
81         //      if len(cmd.ReceiveCommandId) == 0 {
82         //              return nil
83         //      }
84         //
85         //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
86 }
87
88 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
89
90         if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
91                 now := time.Now()
92                 d.processResult.StartTime = &now
93         }
94
95         err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
96
97         if err != nil {
98                 d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
99                 d.processResult.Err = err
100                 d.processResult.Stats.SentErrorCount.Inc()
101                 return err
102         }
103
104         d.processResult.Stats.SentCount.Inc()
105         return nil
106 }
107
108 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
109
110         if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
111                 now := time.Now()
112                 d.processResult.StartTime = &now
113         }
114
115         for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
116
117                 select {
118                 case <-ctx.Done():
119                         return
120                 default:
121                 }
122
123                 err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
124
125                 if err != nil {
126                         d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
127                         d.processResult.Stats.SentErrorCount.Inc()
128                         continue
129                 }
130
131                 d.processResult.Stats.SentCount.Inc()
132                 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
133         }
134 }
135
136 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
137         command, ok := configuration[receiveCommandId]
138
139         if !ok {
140                 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
141         }
142
143         if len(command.RmrMessageType) == 0 {
144                 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
145         }
146
147         return command.RmrMessageType, nil
148 }
149
150 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
151
152         defer sendAndReceiveWg.Done()
153         var listenAndHandleWg sync.WaitGroup
154
155         if len(command.ReceiveCommandId) > 0 {
156                 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
157
158                 if err != nil {
159                         d.processResult.Err = err
160                         return
161                 }
162
163                 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
164
165                 if err != nil {
166                         d.processResult.Err = err
167                         return
168                 }
169
170                 listenAndHandleWg.Add(1)
171                 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
172         }
173
174         if command.RepeatCount == 0 {
175                 err := d.sendNoRepeat(command)
176
177                 if err != nil {
178                         return
179                 }
180
181         } else {
182                 d.sendWithRepeat(ctx, command)
183         }
184
185         if len(command.ReceiveCommandId) > 0 {
186                 listenAndHandleWg.Wait()
187         }
188 }
189
190 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
191
192         defer sendAndReceiveWg.Done()
193
194         err := addRmrMessageToWaitFor(command.RmrMessageType, command)
195
196         if err != nil {
197                 d.processResult.Err = err
198                 return
199         }
200
201         var listenAndHandleWg sync.WaitGroup
202         listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
203         d.listenAndHandle(ctx, &listenAndHandleWg, command)
204 }
205
206 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
207         var command models.JsonCommand
208         if len(cmd.Id) == 0 {
209                 return command, errors.New(fmt.Sprintf("invalid command, no id"))
210         }
211
212         command = *cmd
213
214         conf, ok := configuration[cmd.Id]
215
216         if ok {
217                 command = *conf
218                 mergeConfigurationAndCommand(&command, cmd)
219         }
220
221         return command, nil
222 }
223
224 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
225
226         command, err := getMergedCommand(cmd)
227
228         if err != nil {
229                 d.processResult.Err = err
230                 return
231         }
232
233         var sendAndReceiveWg sync.WaitGroup
234
235         commandAction := enums.CommandAction(command.Action)
236
237         switch commandAction {
238
239         case enums.SendRmrMessage:
240                 sendAndReceiveWg.Add(1)
241                 go d.sendHandler(ctx, &sendAndReceiveWg, command)
242         case enums.ReceiveRmrMessage:
243                 sendAndReceiveWg.Add(1)
244                 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
245         default:
246                 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
247                 return
248         }
249
250         sendAndReceiveWg.Wait()
251 }
252
253 func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
254         responseCommand, ok := configuration[command.SendCommandId]
255
256         if !ok {
257                 return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
258         }
259
260         return responseCommand, nil
261 }
262
263 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
264         for {
265                 select {
266                 case <-ctx.Done():
267                         return
268                 default:
269                 }
270
271                 mbuf, err := d.rmrService.RecvMessage()
272
273                 if err != nil {
274                         d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
275                         d.processResult.Err = err
276                         d.processResult.Stats.ReceivedErrorCount.Inc()
277                         return
278                 }
279
280                 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
281                         now := time.Now()
282                         d.processResult.StartTime = &now
283                 }
284
285                 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
286
287                 _, ok := waitForRmrMessageType[mbuf.MType]
288
289                 if !ok {
290                         d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
291                         d.processResult.Stats.ReceivedUnexpectedCount.Inc()
292                         continue
293                 }
294
295                 d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
296                 d.processResult.Stats.ReceivedExpectedCount.Inc()
297
298                 if len(command.SendCommandId) > 0 {
299                         responseCommand, err := getResponseCommand(command)
300
301                         if err != nil {
302                                 d.processResult.Err = err
303                                 return
304                         }
305
306                         _ = d.sendNoRepeat(*responseCommand)
307                 }
308
309                 return
310         }
311 }
312
313 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
314
315         var responseCommand *models.JsonCommand
316
317         if len(command.SendCommandId) > 0 {
318                 var err error
319                 responseCommand, err = getResponseCommand(command)
320
321                 if err != nil {
322                         d.processResult.Err = err
323                         return
324                 }
325         }
326
327         for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
328                 select {
329                 case <-ctx.Done():
330                         return
331                 default:
332                 }
333
334                 mbuf, err := d.rmrService.RecvMessage()
335
336                 if err != nil {
337                         d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
338                         d.processResult.Stats.ReceivedErrorCount.Inc()
339                         continue
340                 }
341
342                 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
343                         now := time.Now()
344                         d.processResult.StartTime = &now
345                 }
346
347                 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
348
349                 _, ok := waitForRmrMessageType[mbuf.MType]
350
351                 if !ok {
352                         d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
353                         d.processResult.Stats.ReceivedUnexpectedCount.Inc()
354                         continue
355                 }
356
357                 d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
358                 d.processResult.Stats.ReceivedExpectedCount.Inc()
359
360                 if responseCommand != nil {
361                         _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
362                 }
363         }
364 }
365
366 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
367
368         defer listenAndHandleWg.Done()
369
370         if command.RepeatCount == 0 {
371                 d.listenAndHandleNoRepeat(ctx, command)
372                 return
373         }
374
375         d.listenAndHandleWithRepeat(ctx, command)
376 }
377
378 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
379         nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
380
381         for i := 0; i < nFields; i++ {
382                 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
383                         switch fieldValue.Kind() {
384                         case reflect.String:
385                                 if fieldValue.Len() > 0 {
386                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
387                                 }
388                         case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
389                                 if fieldValue.Int() != 0 {
390                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
391                                 }
392                         case reflect.Bool:
393                                 if fieldValue.Bool() {
394                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
395                                 }
396                         case reflect.Float64, reflect.Float32:
397                                 if fieldValue.Float() != 0 {
398                                         reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
399                                 }
400                         default:
401                                 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
402                         }
403                 }
404         }
405 }