155f92ceb9086399e540b2846532858932226082
[ric-plt/submgr.git] / pkg / control / main_test.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "errors"
25         "fmt"
26         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         "io/ioutil"
29         "net/http"
30         "os"
31         "strconv"
32         "strings"
33         "testing"
34         "time"
35 )
36
37 //-----------------------------------------------------------------------------
38 //
39 //-----------------------------------------------------------------------------
40 type testingControl struct {
41         desc     string
42         syncChan chan struct{}
43 }
44
45 func (tc *testingControl) ReadyCB(data interface{}) {
46         xapp.Logger.Info("testingControl(%s) ReadyCB", tc.desc)
47         tc.syncChan <- struct{}{}
48         return
49 }
50
51 func (tc *testingControl) WaitCB() {
52         <-tc.syncChan
53 }
54
55 func initTestingControl(desc string, rtfile string, port string) testingControl {
56         tc := testingControl{}
57         os.Setenv("RMR_SEED_RT", rtfile)
58         os.Setenv("RMR_SRC_ID", "localhost:"+port)
59         xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
60         xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
61         tc.desc = strings.ToUpper(desc)
62         tc.syncChan = make(chan struct{})
63         return tc
64 }
65
66 //-----------------------------------------------------------------------------
67 //
68 //-----------------------------------------------------------------------------
69 type testingRmrControl struct {
70         testingControl
71         rmrClientTest *xapp.RMRClient
72 }
73
74 func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
75         //
76         //NOTE: Do this way until xapp-frame sending is improved
77         //
78         status := false
79         i := 1
80         for ; i <= 10 && status == false; i++ {
81                 status = tc.rmrClientTest.SendMsg(params)
82                 if status == false {
83                         xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
84                         time.Sleep(500 * time.Millisecond)
85                 }
86         }
87         if status == false {
88                 err = errors.New("rmr.Send() failed")
89                 tc.rmrClientTest.Free(params.Mbuf)
90         }
91         return
92 }
93
94 func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
95         tc := testingRmrControl{}
96         tc.testingControl = initTestingControl(desc, rtfile, port)
97         tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
98         tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
99         go tc.rmrClientTest.Start(consumer)
100         tc.WaitCB()
101         return tc
102 }
103
104 //-----------------------------------------------------------------------------
105 //
106 //-----------------------------------------------------------------------------
107 type testingMessageChannel struct {
108         rmrConChan chan *xapp.RMRParams
109 }
110
111 func initTestingMessageChannel() testingMessageChannel {
112         mc := testingMessageChannel{}
113         mc.rmrConChan = make(chan *xapp.RMRParams)
114         return mc
115 }
116
117 //-----------------------------------------------------------------------------
118 //
119 //-----------------------------------------------------------------------------
120 type xappTransaction struct {
121         tc   *testingXappControl
122         xid  string
123         meid *xapp.RMRMeid
124 }
125
126 type testingXappControl struct {
127         testingRmrControl
128         testingMessageChannel
129         xid_seq uint64
130 }
131
132 func (tc *testingXappControl) newXid() string {
133         var xid string
134         xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
135         tc.xid_seq++
136         return xid
137 }
138
139 func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
140         trans := &xappTransaction{}
141         trans.tc = tc
142         if xid == nil {
143                 trans.xid = tc.newXid()
144         } else {
145                 trans.xid = *xid
146         }
147         trans.meid = &xapp.RMRMeid{RanName: ranname}
148         return trans
149 }
150
151 func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
152
153         if strings.Contains(msg.Xid, tc.desc) {
154                 xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
155                 tc.rmrConChan <- msg
156         } else {
157                 xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc)
158         }
159         return
160 }
161
162 func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl {
163         xappCtrl := &testingXappControl{}
164         xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
165         xappCtrl.testingMessageChannel = initTestingMessageChannel()
166         xappCtrl.xid_seq = 1
167         return xappCtrl
168 }
169
170 //-----------------------------------------------------------------------------
171 //
172 //-----------------------------------------------------------------------------
173 type testingE2termControl struct {
174         testingRmrControl
175         testingMessageChannel
176 }
177
178 func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) {
179         xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
180         tc.rmrConChan <- msg
181         return
182 }
183
184 func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
185         e2termCtrl := &testingE2termControl{}
186         e2termCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
187         e2termCtrl.testingMessageChannel = initTestingMessageChannel()
188         return e2termCtrl
189 }
190
191 //-----------------------------------------------------------------------------
192 //
193 //-----------------------------------------------------------------------------
194 type testingMainControl struct {
195         testingControl
196         c *Control
197 }
198
199 func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
200         mainCtrl = &testingMainControl{}
201         mainCtrl.testingControl = initTestingControl(desc, rtfile, port)
202         mainCtrl.c = NewControl()
203         xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
204         go xapp.RunWithParams(mainCtrl.c, false)
205         mainCtrl.WaitCB()
206         return mainCtrl
207 }
208
209 //-----------------------------------------------------------------------------
210 //
211 //-----------------------------------------------------------------------------
212
213 func testError(t *testing.T, pattern string, args ...interface{}) {
214         xapp.Logger.Error(fmt.Sprintf(pattern, args...))
215         t.Errorf(fmt.Sprintf(pattern, args...))
216 }
217
218 func testLog(t *testing.T, pattern string, args ...interface{}) {
219         xapp.Logger.Info(fmt.Sprintf(pattern, args...))
220         t.Logf(fmt.Sprintf(pattern, args...))
221 }
222
223 func testCreateTmpFile(str string) (string, error) {
224         file, err := ioutil.TempFile("/tmp", "*.rt")
225         if err != nil {
226                 return "", err
227         }
228         _, err = file.WriteString(str)
229         if err != nil {
230                 file.Close()
231                 return "", err
232         }
233         return file.Name(), nil
234 }
235
236 //-----------------------------------------------------------------------------
237 //
238 //-----------------------------------------------------------------------------
239
240 var xappConn1 *testingXappControl
241 var xappConn2 *testingXappControl
242 var e2termConn *testingE2termControl
243 var mainCtrl *testingMainControl
244
245 func TestMain(m *testing.M) {
246         xapp.Logger.Info("TestMain start")
247
248         //
249         //Cfg creation won't work like this as xapp-frame reads it during init.
250         //
251         /*
252             cfgstr:=`{
253               "local": {
254                   "host": ":8080"
255               },
256               "logger": {
257                   "level": 4
258               },
259               "rmr": {
260                  "protPort": "tcp:14560",
261                  "maxSize": 4096,
262                  "numWorkers": 1,
263                  "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
264                  "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
265               },
266               "db": {
267                   "host": "localhost",
268                   "port": 6379,
269                   "namespaces": ["sdl", "rnib"]
270               },
271                  "rtmgr" : {
272                    "HostAddr" : "localhost",
273                    "port" : "8989",
274                    "baseUrl" : "/"
275                  }
276            `
277
278            cfgfilename,_ := testCreateTmpFile(cfgstr)
279            defer os.Remove(cfgfilename)
280            os.Setenv("CFG_FILE", cfgfilename)
281         */
282         xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
283
284         //---------------------------------
285         // NOTE: Routing table is configured so, that responses
286         //       are duplicated to xapp1 and xapp2 instances.
287         //       If XID is not matching xapp stub will just
288         //       drop message. (Messages 12011, 12012, 12021, 12022)
289         //---------------------------------
290         xapp.Logger.Info("### submgr main run ###")
291
292         subsrt := `newrt|start
293 mse|12010|-1|localhost:14560
294 mse|12010,localhost:14560|-1|localhost:15560
295 mse|12011,localhost:15560|-1|localhost:14560
296 mse|12011|-1|localhost:13560;localhost:13660
297 mse|12012,localhost:15560|-1|localhost:14560
298 mse|12012|-1|localhost:13560;localhost:13660
299 mse|12020|-1|localhost:14560
300 mse|12020,localhost:14560|-1|localhost:15560
301 mse|12021,localhost:15560|-1|localhost:14560
302 mse|12021|-1|localhost:13560;localhost:13660
303 mse|12022,localhost:15560|-1|localhost:14560
304 mse|12022|-1|localhost:13560;localhost:13660
305 newrt|end
306 `
307         subrtfilename, _ := testCreateTmpFile(subsrt)
308         defer os.Remove(subrtfilename)
309         mainCtrl = createNewMainControl("main", subrtfilename, "14560")
310
311         //---------------------------------
312         //
313         //---------------------------------
314         xapp.Logger.Info("### xapp1 rmr run ###")
315
316         xapprt1 := `newrt|start
317 mse|12010|-1|localhost:14560
318 mse|12011|-1|localhost:13560
319 mse|12012|-1|localhost:13560
320 mse|12020|-1|localhost:14560
321 mse|12021|-1|localhost:13560
322 mse|12022|-1|localhost:13560
323 newrt|end
324 `
325
326         xapprtfilename1, _ := testCreateTmpFile(xapprt1)
327         defer os.Remove(xapprtfilename1)
328         xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB")
329
330         //---------------------------------
331         //
332         //---------------------------------
333
334         xapp.Logger.Info("### xapp2 rmr run ###")
335
336         xapprt2 := `newrt|start
337 mse|12010|-1|localhost:14560
338 mse|12011|-1|localhost:13660
339 mse|12012|-1|localhost:13660
340 mse|12020|-1|localhost:14560
341 mse|12021|-1|localhost:13660
342 mse|12022|-1|localhost:13660
343 newrt|end
344 `
345
346         xapprtfilename2, _ := testCreateTmpFile(xapprt2)
347         defer os.Remove(xapprtfilename2)
348         xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB")
349
350         //---------------------------------
351         //
352         //---------------------------------
353         xapp.Logger.Info("### e2term rmr run ###")
354
355         e2termrt := `newrt|start
356 mse|12010|-1|localhost:15560
357 mse|12011|-1|localhost:14560
358 mse|12012|-1|localhost:14560
359 mse|12020|-1|localhost:15560
360 mse|12021|-1|localhost:14560
361 mse|12022|-1|localhost:14560
362 newrt|end
363 `
364
365         e2termrtfilename, _ := testCreateTmpFile(e2termrt)
366         defer os.Remove(e2termrtfilename)
367         e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
368
369         //---------------------------------
370         //
371         //---------------------------------
372         http_handler := func(w http.ResponseWriter, r *http.Request) {
373                 var req rtmgr_models.XappSubscriptionData
374                 err := json.NewDecoder(r.Body).Decode(&req)
375                 if err != nil {
376                         xapp.Logger.Error("%s", err.Error())
377                 }
378                 xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
379
380                 w.WriteHeader(200)
381         }
382
383         go func() {
384                 http.HandleFunc("/", http_handler)
385                 http.ListenAndServe("localhost:8989", nil)
386         }()
387
388         //---------------------------------
389         //
390         //---------------------------------
391         code := m.Run()
392         os.Exit(code)
393 }