7db5f802b12ac572f5011d250cd4675f0f57116d
[ric-plt/xapp-frame.git] / test / xapp / generator.go
1 package main
2
3 import (
4         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
5         "sync"
6         "time"
7 )
8
9 var (
10         wg     sync.WaitGroup
11         mux    sync.Mutex
12         rx     int
13         tx     int
14         failed int
15 )
16
17 type Generator struct {
18 }
19
20 func (m Generator) Consume(params *xapp.RMRParams) (err error) {
21         xapp.Logger.Debug("message received - type=%d subId=%d meid=%v", params.Mtype, params.SubId, params.Meid)
22
23         mux.Lock()
24         rx++
25         mux.Unlock()
26
27         ack := xapp.Config.GetInt("test.waitForAck")
28         if ack != 0 {
29                 wg.Done()
30         }
31
32         return nil
33 }
34
35 func waitForMessages() {
36         done := make(chan struct{})
37         go func() {
38                 wg.Wait()
39                 close(done)
40         }()
41
42         select {
43         case <-done:
44         // All done!
45         case <-time.After(5000 * time.Millisecond):
46                 xapp.Logger.Warn("Message waiting timed out!")
47         }
48 }
49
50 func runTests(mtype, subId, amount, msize, ack int) {
51         tx = 0
52         rx = 0
53         s := make([]byte, msize, msize)
54
55         start := time.Now()
56         for i := 0; i < amount; i++ {
57                 params := &xapp.RMRParams{}
58                 params.Mtype = mtype
59                 params.SubId = subId
60                 params.Payload = s
61                 params.Meid = &xapp.RMRMeid{PlmnID: "123456", EnbID: "7788"}
62                 params.Xid = "TestXID"
63                 if ok := xapp.Rmr.SendMsg(params); ok {
64                         tx++
65                         if ack != 0 {
66                                 wg.Add(1)
67                         }
68                 } else {
69                         failed++
70                 }
71         }
72
73         // Wait until all replies are received, or timeout occurs
74         waitForMessages()
75
76         elapsed := time.Since(start)
77         xapp.Logger.Info("amount=%d|tx=%d|rx=%d|failed=%d|time=%v\n", amount, tx, rx, failed, elapsed)
78 }
79
80 func generator() {
81         // Start RMR and wait until engine is ready
82         go xapp.Rmr.Start(Generator{})
83         for xapp.Rmr.IsReady() == false {
84                 time.Sleep(time.Duration(2) * time.Second)
85         }
86
87         // Read parameters
88         interval := 1000000 * 1.0 / xapp.Config.GetInt("test.rate")
89         mtype := xapp.Config.GetInt("test.mtype")
90         subId := xapp.Config.GetInt("test.subId")
91         amount := xapp.Config.GetInt("test.amount")
92         size := xapp.Config.GetInt("test.size")
93         ack := xapp.Config.GetInt("test.waitForAck")
94         rounds := xapp.Config.GetInt("test.rounds")
95
96         // Now generate message load as per request
97         for i := 0; i < rounds; i++ {
98                 runTests(mtype, subId, amount, size, ack)
99                 if interval != 0 {
100                         time.Sleep(time.Duration(interval) * time.Microsecond)
101                 }
102         }
103
104         return
105 }