Improving unittests to handle rtmgr http api
[ric-plt/submgr.git] / pkg / control / main_test.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "io/ioutil"
28         "net/http"
29         "os"
30         "strconv"
31         "strings"
32         "testing"
33         "time"
34 )
35
36 //-----------------------------------------------------------------------------
37 //
38 //-----------------------------------------------------------------------------
39
40 type httpRtmgrMsg struct {
41         msg *rtmgr_models.XappSubscriptionData
42         w   http.ResponseWriter
43         r   *http.Request
44 }
45
46 func (msg *httpRtmgrMsg) RetOk() {
47         msg.w.WriteHeader(200)
48 }
49
50 type testingHttpRtmgrControl struct {
51         desc       string
52         port       string
53         useChannel bool
54         msgChan    chan *httpRtmgrMsg
55 }
56
57 func (hc *testingHttpRtmgrControl) UseChannel(flag bool) {
58         hc.useChannel = flag
59 }
60
61 func (hc *testingHttpRtmgrControl) WaitReq(t *testing.T) *httpRtmgrMsg {
62         xapp.Logger.Info("(%s) handle_rtmgr_req", hc.desc)
63         select {
64         case msg := <-hc.msgChan:
65                 return msg
66         case <-time.After(15 * time.Second):
67                 testError(t, "(%s) Not Received RTMGR Subscription message within 15 secs", hc.desc)
68                 return nil
69         }
70         return nil
71 }
72
73 func (hc *testingHttpRtmgrControl) http_handler(w http.ResponseWriter, r *http.Request) {
74         var req rtmgr_models.XappSubscriptionData
75         err := json.NewDecoder(r.Body).Decode(&req)
76         if err != nil {
77                 xapp.Logger.Error("%s", err.Error())
78         }
79         xapp.Logger.Info("(%s) handling Address=%s Port=%d SubscriptionID=%d", hc.desc, *req.Address, *req.Port, *req.SubscriptionID)
80         msg := &httpRtmgrMsg{
81                 msg: &req,
82                 w:   w,
83                 r:   r,
84         }
85         if hc.useChannel {
86                 hc.msgChan <- msg
87         } else {
88                 msg.RetOk()
89         }
90 }
91
92 func (hc *testingHttpRtmgrControl) run() {
93         http.HandleFunc("/", hc.http_handler)
94         http.ListenAndServe("localhost:"+hc.port, nil)
95 }
96
97 func initTestingHttpRtmgrControl(desc string, port string) *testingHttpRtmgrControl {
98         hc := &testingHttpRtmgrControl{}
99         hc.desc = desc
100         hc.port = port
101         hc.useChannel = false
102         hc.msgChan = make(chan *httpRtmgrMsg)
103         return hc
104 }
105
106 //-----------------------------------------------------------------------------
107 //
108 //-----------------------------------------------------------------------------
109 type testingRmrControl struct {
110         desc     string
111         syncChan chan struct{}
112 }
113
114 func (tc *testingRmrControl) ReadyCB(data interface{}) {
115         xapp.Logger.Info("testingRmrControl(%s) ReadyCB", tc.desc)
116         tc.syncChan <- struct{}{}
117         return
118 }
119
120 func (tc *testingRmrControl) WaitCB() {
121         <-tc.syncChan
122 }
123
124 func initTestingControl(desc string, rtfile string, port string) testingRmrControl {
125         tc := testingRmrControl{}
126         os.Setenv("RMR_SEED_RT", rtfile)
127         os.Setenv("RMR_SRC_ID", "localhost:"+port)
128         xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
129         xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
130         tc.desc = strings.ToUpper(desc)
131         tc.syncChan = make(chan struct{})
132         return tc
133 }
134
135 //-----------------------------------------------------------------------------
136 //
137 //-----------------------------------------------------------------------------
138 type testingRmrStubControl struct {
139         testingRmrControl
140         rmrClientTest *xapp.RMRClient
141         active        bool
142 }
143
144 func (tc *testingRmrStubControl) RmrSend(params *RMRParams) (err error) {
145         //
146         //NOTE: Do this way until xapp-frame sending is improved
147         //
148         xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String())
149         status := false
150         i := 1
151         for ; i <= 10 && status == false; i++ {
152                 status = tc.rmrClientTest.SendMsg(params.RMRParams)
153                 if status == false {
154                         xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
155                         time.Sleep(500 * time.Millisecond)
156                 }
157         }
158         if status == false {
159                 err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
160                 xapp.Rmr.Free(params.Mbuf)
161         }
162         return
163 }
164
165 func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrStubControl {
166         tc := testingRmrStubControl{}
167         tc.active = false
168         tc.testingRmrControl = initTestingControl(desc, rtfile, port)
169         tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
170         tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
171         go tc.rmrClientTest.Start(consumer)
172         tc.WaitCB()
173         return tc
174 }
175
176 //-----------------------------------------------------------------------------
177 //
178 //-----------------------------------------------------------------------------
179 type testingMessageChannel struct {
180         rmrConChan chan *RMRParams
181 }
182
183 func initTestingMessageChannel() testingMessageChannel {
184         mc := testingMessageChannel{}
185         mc.rmrConChan = make(chan *RMRParams)
186         return mc
187 }
188
189 //-----------------------------------------------------------------------------
190 //
191 //-----------------------------------------------------------------------------
192 type xappTransaction struct {
193         tc   *testingXappControl
194         xid  string
195         meid *xapp.RMRMeid
196 }
197
198 type testingXappControl struct {
199         testingRmrStubControl
200         testingMessageChannel
201         xid_seq uint64
202 }
203
204 func (tc *testingXappControl) newXid() string {
205         var xid string
206         xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
207         tc.xid_seq++
208         return xid
209 }
210
211 func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
212         trans := &xappTransaction{}
213         trans.tc = tc
214         if xid == nil {
215                 trans.xid = tc.newXid()
216         } else {
217                 trans.xid = *xid
218         }
219         trans.meid = &xapp.RMRMeid{RanName: ranname}
220         return trans
221 }
222
223 func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
224         xapp.Rmr.Free(params.Mbuf)
225         params.Mbuf = nil
226         msg := &RMRParams{params}
227
228         if params.Mtype == 55555 {
229                 xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
230                 tc.active = true
231                 return
232         }
233
234         if strings.Contains(msg.Xid, tc.desc) {
235                 xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
236                 tc.rmrConChan <- msg
237         } else {
238                 xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String())
239         }
240         return
241 }
242
243 func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl {
244         xappCtrl := &testingXappControl{}
245         xappCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
246         xappCtrl.testingMessageChannel = initTestingMessageChannel()
247         xappCtrl.xid_seq = 1
248         return xappCtrl
249 }
250
251 //-----------------------------------------------------------------------------
252 //
253 //-----------------------------------------------------------------------------
254 type testingE2termControl struct {
255         testingRmrStubControl
256         testingMessageChannel
257 }
258
259 func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) {
260         xapp.Rmr.Free(params.Mbuf)
261         params.Mbuf = nil
262         msg := &RMRParams{params}
263
264         if params.Mtype == 55555 {
265                 xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
266                 tc.active = true
267                 return
268         }
269
270         xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
271         tc.rmrConChan <- msg
272         return
273 }
274
275 func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
276         e2termCtrl := &testingE2termControl{}
277         e2termCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
278         e2termCtrl.testingMessageChannel = initTestingMessageChannel()
279         return e2termCtrl
280 }
281
282 //-----------------------------------------------------------------------------
283 //
284 //-----------------------------------------------------------------------------
285 type testingMainControl struct {
286         testingRmrControl
287         c *Control
288 }
289
290 func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
291         mainCtrl = &testingMainControl{}
292         mainCtrl.testingRmrControl = initTestingControl(desc, rtfile, port)
293         mainCtrl.c = NewControl()
294         xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
295         go xapp.RunWithParams(mainCtrl.c, false)
296         mainCtrl.WaitCB()
297         return mainCtrl
298 }
299
300 //-----------------------------------------------------------------------------
301 //
302 //-----------------------------------------------------------------------------
303
304 func testError(t *testing.T, pattern string, args ...interface{}) {
305         xapp.Logger.Error(fmt.Sprintf(pattern, args...))
306         t.Errorf(fmt.Sprintf(pattern, args...))
307 }
308
309 func testLog(t *testing.T, pattern string, args ...interface{}) {
310         xapp.Logger.Info(fmt.Sprintf(pattern, args...))
311         t.Logf(fmt.Sprintf(pattern, args...))
312 }
313
314 func testCreateTmpFile(str string) (string, error) {
315         file, err := ioutil.TempFile("/tmp", "*.rt")
316         if err != nil {
317                 return "", err
318         }
319         _, err = file.WriteString(str)
320         if err != nil {
321                 file.Close()
322                 return "", err
323         }
324         return file.Name(), nil
325 }
326
327 //-----------------------------------------------------------------------------
328 //
329 //-----------------------------------------------------------------------------
330
331 var xappConn1 *testingXappControl
332 var xappConn2 *testingXappControl
333 var e2termConn *testingE2termControl
334 var mainCtrl *testingMainControl
335 var rtmgrHttp *testingHttpRtmgrControl
336
337 func TestMain(m *testing.M) {
338         xapp.Logger.Info("TestMain start")
339
340         //---------------------------------
341         //
342         //---------------------------------
343         rtmgrHttp = initTestingHttpRtmgrControl("RTMGRSTUB", "8989")
344         go rtmgrHttp.run()
345
346         //---------------------------------
347         //
348         //---------------------------------
349
350         //
351         //Cfg creation won't work like this as xapp-frame reads it during init.
352         //
353         /*
354             cfgstr:=`{
355               "local": {
356                   "host": ":8080"
357               },
358               "logger": {
359                   "level": 4
360               },
361               "rmr": {
362                  "protPort": "tcp:14560",
363                  "maxSize": 4096,
364                  "numWorkers": 1,
365                  "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
366                  "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
367               },
368               "db": {
369                   "host": "localhost",
370                   "port": 6379,
371                   "namespaces": ["sdl", "rnib"]
372               },
373                  "rtmgr" : {
374                    "HostAddr" : "localhost",
375                    "port" : "8989",
376                    "baseUrl" : "/"
377                  }
378            `
379
380            cfgfilename,_ := testCreateTmpFile(cfgstr)
381            defer os.Remove(cfgfilename)
382            os.Setenv("CFG_FILE", cfgfilename)
383         */
384         xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
385
386         //---------------------------------
387         // Static routetable for rmr
388         //
389         // NOTE: Routing table is configured so, that responses
390         //       are duplicated to xapp1 and xapp2 instances.
391         //       If XID is not matching xapp stub will just
392         //       drop message. (Messages 12011, 12012, 12021, 12022)
393         //
394         // 14560 submgr
395         // 15560 e2term stub
396         // 13560 xapp1 stub
397         // 13660 xapp2 stub
398         //
399         //---------------------------------
400
401         allrt := `newrt|start
402 mse|12010|-1|localhost:14560
403 mse|12010,localhost:14560|-1|localhost:15560
404 mse|12011,localhost:15560|-1|localhost:14560
405 mse|12012,localhost:15560|-1|localhost:14560
406 mse|12011,localhost:14560|-1|localhost:13660;localhost:13560
407 mse|12012,localhost:14560|-1|localhost:13660;localhost:13560
408 mse|12020|-1|localhost:14560
409 mse|12020,localhost:14560|-1|localhost:15560
410 mse|12021,localhost:15560|-1|localhost:14560
411 mse|12022,localhost:15560|-1|localhost:14560
412 mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
413 mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
414 mse|55555|-1|localhost:13660;localhost:13560,localhost:15560
415 newrt|end
416 `
417
418         //---------------------------------
419         //
420         //---------------------------------
421         xapp.Logger.Info("### submgr main run ###")
422
423         subsrt := allrt
424         /*
425                 subsrt := `newrt|start
426            mse|12010|-1|localhost:14560
427            mse|12010,localhost:14560|-1|localhost:15560
428            mse|12011,localhost:15560|-1|localhost:14560
429            mse|12011|-1|localhost:13560;localhost:13660
430            mse|12012,localhost:15560|-1|localhost:14560
431            mse|12012|-1|localhost:13560;localhost:13660
432            mse|12020|-1|localhost:14560
433            mse|12020,localhost:14560|-1|localhost:15560
434            mse|12021,localhost:15560|-1|localhost:14560
435            mse|12021|-1|localhost:13560;localhost:13660
436            mse|12022,localhost:15560|-1|localhost:14560
437            mse|12022|-1|localhost:13560;localhost:13660
438            newrt|end
439            `
440         */
441
442         subrtfilename, _ := testCreateTmpFile(subsrt)
443         defer os.Remove(subrtfilename)
444         mainCtrl = createNewMainControl("main", subrtfilename, "14560")
445
446         //---------------------------------
447         //
448         //---------------------------------
449         xapp.Logger.Info("### xapp1 rmr run ###")
450
451         xapprt1 := allrt
452         /*
453                 xapprt1 := `newrt|start
454            mse|12010|-1|localhost:14560
455            mse|12011|-1|localhost:13560
456            mse|12012|-1|localhost:13560
457            mse|12020|-1|localhost:14560
458            mse|12021|-1|localhost:13560
459            mse|12022|-1|localhost:13560
460            newrt|end
461            `
462         */
463
464         xapprtfilename1, _ := testCreateTmpFile(xapprt1)
465         defer os.Remove(xapprtfilename1)
466         xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB")
467
468         //---------------------------------
469         //
470         //---------------------------------
471
472         xapp.Logger.Info("### xapp2 rmr run ###")
473
474         xapprt2 := allrt
475         /*
476                 xapprt2 := `newrt|start
477            mse|12010|-1|localhost:14560
478            mse|12011|-1|localhost:13660
479            mse|12012|-1|localhost:13660
480            mse|12020|-1|localhost:14560
481            mse|12021|-1|localhost:13660
482            mse|12022|-1|localhost:13660
483            newrt|end
484            `
485         */
486
487         xapprtfilename2, _ := testCreateTmpFile(xapprt2)
488         defer os.Remove(xapprtfilename2)
489         xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB")
490
491         //---------------------------------
492         //
493         //---------------------------------
494         xapp.Logger.Info("### e2term rmr run ###")
495
496         e2termrt := allrt
497         /*
498                 e2termrt := `newrt|start
499            mse|12010|-1|localhost:15560
500            mse|12011|-1|localhost:14560
501            mse|12012|-1|localhost:14560
502            mse|12020|-1|localhost:15560
503            mse|12021|-1|localhost:14560
504            mse|12022|-1|localhost:14560
505            newrt|end
506            `
507         */
508
509         e2termrtfilename, _ := testCreateTmpFile(e2termrt)
510         defer os.Remove(e2termrtfilename)
511         e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
512
513         //---------------------------------
514         // Testing message sending
515         //---------------------------------
516         var dummyBuf []byte = make([]byte, 100)
517
518         params := &RMRParams{&xapp.RMRParams{}}
519         params.Mtype = 55555
520         params.SubId = -1
521         params.Payload = dummyBuf
522         params.PayloadLen = 100
523         params.Meid = &xapp.RMRMeid{RanName: "NONEXISTINGRAN"}
524         params.Xid = "THISISTESTFORSTUBS"
525         params.Mbuf = nil
526
527         status := false
528         i := 1
529         for ; i <= 10 && status == false; i++ {
530                 xapp.Rmr.Send(params.RMRParams, false)
531                 if e2termConn.active == true && xappConn1.active == true && xappConn2.active == true {
532                         status = true
533                         break
534                 } else {
535                         xapp.Logger.Info("Sleep 0.5 secs and try routes again")
536                         time.Sleep(500 * time.Millisecond)
537                 }
538         }
539
540         if status == false {
541                 xapp.Logger.Error("Could not initialize routes")
542                 os.Exit(1)
543         }
544
545         //---------------------------------
546         //
547         //---------------------------------
548         code := m.Run()
549         os.Exit(code)
550 }