4a86edbebd74d5ea76db5328a519a9dbf9d5ee75
[ric-plt/xapp-frame.git] / test / xapp / generator.go
1 package main
2
3 import (
4         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
5         "sync"
6         "time"
7 )
8
9 var (
10         wg     sync.WaitGroup
11         mux    sync.Mutex
12         rx     int
13         tx     int
14         failed int
15 )
16
17 type Generator struct {
18 }
19
20 func (m Generator) Consume(mtype, subId, len int, payload []byte) (err error) {
21         xapp.Logger.Debug("message received - type=%d subId=%d len=%d", mtype, subId, len)
22
23         mux.Lock()
24         rx++
25         mux.Unlock()
26
27         ack := xapp.Config.GetInt("test.waitForAck")
28         if ack != 0 {
29                 wg.Done()
30         }
31
32         return nil
33 }
34
35 func waitForMessages() {
36         done := make(chan struct{})
37         go func() {
38                 wg.Wait()
39                 close(done)
40         }()
41
42         select {
43         case <-done:
44         // All done!
45         case <-time.After(5000 * time.Millisecond):
46                 xapp.Logger.Warn("Message waiting timed out!")
47         }
48 }
49
50 func runTests(mtype, subId, amount, msize, ack int) {
51         tx = 0
52         rx = 0
53         s := make([]byte, msize, msize)
54
55         start := time.Now()
56         for i := 0; i < amount; i++ {
57                 if ok := xapp.Rmr.Send(mtype, subId, msize, s); ok {
58                         tx++
59                         if ack != 0 {
60                                 wg.Add(1)
61                         }
62                 } else {
63                         failed++
64                 }
65         }
66
67         // Wait until all replies are received, or timeout occurs
68         waitForMessages()
69
70         elapsed := time.Since(start)
71         xapp.Logger.Info("amount=%d|tx=%d|rx=%d|failed=%d|time=%v\n", amount, tx, rx, failed, elapsed)
72 }
73
74 func generator() {
75         // Start RMR and wait until engine is ready
76         go xapp.Rmr.Start(Generator{})
77         for xapp.Rmr.IsReady() == false {
78                 time.Sleep(time.Duration(2) * time.Second)
79         }
80
81         // Read parameters
82         interval := 1000000 * 1.0 / xapp.Config.GetInt("test.rate")
83         mtype := xapp.Config.GetInt("test.mtype")
84         subId := xapp.Config.GetInt("test.subId")
85         amount := xapp.Config.GetInt("test.amount")
86         size := xapp.Config.GetInt("test.size")
87         ack := xapp.Config.GetInt("test.waitForAck")
88         rounds := xapp.Config.GetInt("test.rounds")
89
90         // Now generate message load as per request
91         for i := 0; i < rounds; i++ {
92                 runTests(mtype, subId, amount, size, ack)
93                 if interval != 0 {
94                         time.Sleep(time.Duration(interval) * time.Microsecond)
95                 }
96         }
97
98         return
99 }