d3d7b57acfcaccf24a315908ba784373398ebec7
[ric-plt/submgr.git] / pkg / control / main_test.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "fmt"
25         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
26         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
27         "io/ioutil"
28         "net/http"
29         "os"
30         "strconv"
31         "strings"
32         "sync"
33         "testing"
34         "time"
35 )
36
37 //-----------------------------------------------------------------------------
38 //
39 //-----------------------------------------------------------------------------
40
41 type httpEventWaiter struct {
42         resultChan   chan bool
43         nextActionOk bool
44 }
45
46 func (msg *httpEventWaiter) SetResult(res bool) {
47         msg.resultChan <- res
48 }
49
50 func (msg *httpEventWaiter) WaitResult(t *testing.T) bool {
51         select {
52         case result := <-msg.resultChan:
53                 return result
54         case <-time.After(15 * time.Second):
55                 testError(t, "Waiter not received result status from case within 15 secs")
56                 return false
57         }
58         testError(t, "Waiter error in default branch")
59         return false
60 }
61
62 //-----------------------------------------------------------------------------
63 //
64 //-----------------------------------------------------------------------------
65 type testingHttpRtmgrControl struct {
66         sync.Mutex
67         desc        string
68         port        string
69         eventWaiter *httpEventWaiter
70 }
71
72 func (hc *testingHttpRtmgrControl) NextEvent(eventWaiter *httpEventWaiter) {
73         hc.Lock()
74         defer hc.Unlock()
75         hc.eventWaiter = eventWaiter
76 }
77
78 func (hc *testingHttpRtmgrControl) AllocNextEvent(nextAction bool) *httpEventWaiter {
79         eventWaiter := &httpEventWaiter{
80                 resultChan:   make(chan bool),
81                 nextActionOk: nextAction,
82         }
83         hc.NextEvent(eventWaiter)
84         return eventWaiter
85 }
86
87 func (hc *testingHttpRtmgrControl) http_handler(w http.ResponseWriter, r *http.Request) {
88
89         hc.Lock()
90         defer hc.Unlock()
91
92         var req rtmgr_models.XappSubscriptionData
93         err := json.NewDecoder(r.Body).Decode(&req)
94         if err != nil {
95                 xapp.Logger.Error("%s", err.Error())
96         }
97         xapp.Logger.Info("(%s) handling Address=%s Port=%d SubscriptionID=%d", hc.desc, *req.Address, *req.Port, *req.SubscriptionID)
98
99         var code int = 0
100         switch r.Method {
101         case http.MethodPost:
102                 code = 201
103                 if hc.eventWaiter != nil {
104                         if hc.eventWaiter.nextActionOk == false {
105                                 code = 400
106                         }
107                 }
108         case http.MethodDelete:
109                 code = 200
110                 if hc.eventWaiter != nil {
111                         if hc.eventWaiter.nextActionOk == false {
112                                 code = 400
113                         }
114                 }
115         default:
116                 code = 200
117         }
118
119         waiter := hc.eventWaiter
120         hc.eventWaiter = nil
121         if waiter != nil {
122                 waiter.SetResult(true)
123         }
124         xapp.Logger.Info("(%s) Method=%s Reply with code %d", hc.desc, r.Method, code)
125         w.WriteHeader(code)
126
127 }
128
129 func (hc *testingHttpRtmgrControl) run() {
130         http.HandleFunc("/", hc.http_handler)
131         http.ListenAndServe("localhost:"+hc.port, nil)
132 }
133
134 func initTestingHttpRtmgrControl(desc string, port string) *testingHttpRtmgrControl {
135         hc := &testingHttpRtmgrControl{}
136         hc.desc = desc
137         hc.port = port
138         return hc
139 }
140
141 //-----------------------------------------------------------------------------
142 //
143 //-----------------------------------------------------------------------------
144 type testingRmrControl struct {
145         desc     string
146         syncChan chan struct{}
147 }
148
149 func (tc *testingRmrControl) ReadyCB(data interface{}) {
150         xapp.Logger.Info("testingRmrControl(%s) ReadyCB", tc.desc)
151         tc.syncChan <- struct{}{}
152         return
153 }
154
155 func (tc *testingRmrControl) WaitCB() {
156         <-tc.syncChan
157 }
158
159 func initTestingControl(desc string, rtfile string, port string) testingRmrControl {
160         tc := testingRmrControl{}
161         os.Setenv("RMR_SEED_RT", rtfile)
162         os.Setenv("RMR_SRC_ID", "localhost:"+port)
163         xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
164         xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
165         tc.desc = strings.ToUpper(desc)
166         tc.syncChan = make(chan struct{})
167         return tc
168 }
169
170 //-----------------------------------------------------------------------------
171 //
172 //-----------------------------------------------------------------------------
173 type testingRmrStubControl struct {
174         testingRmrControl
175         rmrClientTest *xapp.RMRClient
176         active        bool
177 }
178
179 func (tc *testingRmrStubControl) RmrSend(params *RMRParams) (err error) {
180         //
181         //NOTE: Do this way until xapp-frame sending is improved
182         //
183         xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String())
184         status := false
185         i := 1
186         for ; i <= 10 && status == false; i++ {
187                 status = tc.rmrClientTest.SendMsg(params.RMRParams)
188                 if status == false {
189                         xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
190                         time.Sleep(500 * time.Millisecond)
191                 }
192         }
193         if status == false {
194                 err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
195                 xapp.Rmr.Free(params.Mbuf)
196         }
197         return
198 }
199
200 func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrStubControl {
201         tc := testingRmrStubControl{}
202         tc.active = false
203         tc.testingRmrControl = initTestingControl(desc, rtfile, port)
204         tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
205         tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
206         go tc.rmrClientTest.Start(consumer)
207         tc.WaitCB()
208         return tc
209 }
210
211 //-----------------------------------------------------------------------------
212 //
213 //-----------------------------------------------------------------------------
214 type testingMessageChannel struct {
215         rmrConChan chan *RMRParams
216 }
217
218 func initTestingMessageChannel() testingMessageChannel {
219         mc := testingMessageChannel{}
220         mc.rmrConChan = make(chan *RMRParams)
221         return mc
222 }
223
224 //-----------------------------------------------------------------------------
225 //
226 //-----------------------------------------------------------------------------
227 type xappTransaction struct {
228         tc   *testingXappControl
229         xid  string
230         meid *xapp.RMRMeid
231 }
232
233 type testingXappControl struct {
234         testingRmrStubControl
235         testingMessageChannel
236         xid_seq uint64
237 }
238
239 func (tc *testingXappControl) newXid() string {
240         var xid string
241         xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
242         tc.xid_seq++
243         return xid
244 }
245
246 func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
247         trans := &xappTransaction{}
248         trans.tc = tc
249         if xid == nil {
250                 trans.xid = tc.newXid()
251         } else {
252                 trans.xid = *xid
253         }
254         trans.meid = &xapp.RMRMeid{RanName: ranname}
255         return trans
256 }
257
258 func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
259         xapp.Rmr.Free(params.Mbuf)
260         params.Mbuf = nil
261         msg := &RMRParams{params}
262
263         if params.Mtype == 55555 {
264                 xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
265                 tc.active = true
266                 return
267         }
268
269         if strings.Contains(msg.Xid, tc.desc) {
270                 xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
271                 tc.rmrConChan <- msg
272         } else {
273                 xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String())
274         }
275         return
276 }
277
278 func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl {
279         xappCtrl := &testingXappControl{}
280         xappCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
281         xappCtrl.testingMessageChannel = initTestingMessageChannel()
282         xappCtrl.xid_seq = 1
283         return xappCtrl
284 }
285
286 //-----------------------------------------------------------------------------
287 //
288 //-----------------------------------------------------------------------------
289 type testingE2termControl struct {
290         testingRmrStubControl
291         testingMessageChannel
292 }
293
294 func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) {
295         xapp.Rmr.Free(params.Mbuf)
296         params.Mbuf = nil
297         msg := &RMRParams{params}
298
299         if params.Mtype == 55555 {
300                 xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
301                 tc.active = true
302                 return
303         }
304
305         xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
306         tc.rmrConChan <- msg
307         return
308 }
309
310 func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
311         e2termCtrl := &testingE2termControl{}
312         e2termCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
313         e2termCtrl.testingMessageChannel = initTestingMessageChannel()
314         return e2termCtrl
315 }
316
317 //-----------------------------------------------------------------------------
318 //
319 //-----------------------------------------------------------------------------
320 type testingMainControl struct {
321         testingRmrControl
322         c *Control
323 }
324
325 func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
326         mainCtrl = &testingMainControl{}
327         mainCtrl.testingRmrControl = initTestingControl(desc, rtfile, port)
328         mainCtrl.c = NewControl()
329         xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
330         go xapp.RunWithParams(mainCtrl.c, false)
331         mainCtrl.WaitCB()
332         return mainCtrl
333 }
334
335 //-----------------------------------------------------------------------------
336 //
337 //-----------------------------------------------------------------------------
338
339 func testError(t *testing.T, pattern string, args ...interface{}) {
340         xapp.Logger.Error(fmt.Sprintf(pattern, args...))
341         t.Errorf(fmt.Sprintf(pattern, args...))
342 }
343
344 func testLog(t *testing.T, pattern string, args ...interface{}) {
345         xapp.Logger.Info(fmt.Sprintf(pattern, args...))
346         t.Logf(fmt.Sprintf(pattern, args...))
347 }
348
349 func testCreateTmpFile(str string) (string, error) {
350         file, err := ioutil.TempFile("/tmp", "*.rt")
351         if err != nil {
352                 return "", err
353         }
354         _, err = file.WriteString(str)
355         if err != nil {
356                 file.Close()
357                 return "", err
358         }
359         return file.Name(), nil
360 }
361
362 //-----------------------------------------------------------------------------
363 //
364 //-----------------------------------------------------------------------------
365
366 var xappConn1 *testingXappControl
367 var xappConn2 *testingXappControl
368 var e2termConn *testingE2termControl
369 var mainCtrl *testingMainControl
370 var rtmgrHttp *testingHttpRtmgrControl
371
372 func TestMain(m *testing.M) {
373         xapp.Logger.Info("TestMain start")
374
375         //---------------------------------
376         //
377         //---------------------------------
378         rtmgrHttp = initTestingHttpRtmgrControl("RTMGRSTUB", "8989")
379         go rtmgrHttp.run()
380
381         //---------------------------------
382         //
383         //---------------------------------
384
385         //
386         //Cfg creation won't work like this as xapp-frame reads it during init.
387         //
388         /*
389             cfgstr:=`{
390               "local": {
391                   "host": ":8080"
392               },
393               "logger": {
394                   "level": 4
395               },
396               "rmr": {
397                  "protPort": "tcp:14560",
398                  "maxSize": 4096,
399                  "numWorkers": 1,
400                  "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
401                  "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
402               },
403               "db": {
404                   "host": "localhost",
405                   "port": 6379,
406                   "namespaces": ["sdl", "rnib"]
407               },
408                  "rtmgr" : {
409                    "HostAddr" : "localhost",
410                    "port" : "8989",
411                    "baseUrl" : "/"
412                  }
413            `
414
415            cfgfilename,_ := testCreateTmpFile(cfgstr)
416            defer os.Remove(cfgfilename)
417            os.Setenv("CFG_FILE", cfgfilename)
418         */
419         xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
420
421         //---------------------------------
422         // Static routetable for rmr
423         //
424         // NOTE: Routing table is configured so, that responses
425         //       are duplicated to xapp1 and xapp2 instances.
426         //       If XID is not matching xapp stub will just
427         //       drop message. (Messages 12011, 12012, 12021, 12022)
428         //
429         // 14560 submgr
430         // 15560 e2term stub
431         // 13560 xapp1 stub
432         // 13660 xapp2 stub
433         //
434         //---------------------------------
435
436         allrt := `newrt|start
437 mse|12010|-1|localhost:14560
438 mse|12010,localhost:14560|-1|localhost:15560
439 mse|12011,localhost:15560|-1|localhost:14560
440 mse|12012,localhost:15560|-1|localhost:14560
441 mse|12011,localhost:14560|-1|localhost:13660;localhost:13560
442 mse|12012,localhost:14560|-1|localhost:13660;localhost:13560
443 mse|12020|-1|localhost:14560
444 mse|12020,localhost:14560|-1|localhost:15560
445 mse|12021,localhost:15560|-1|localhost:14560
446 mse|12022,localhost:15560|-1|localhost:14560
447 mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
448 mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
449 mse|55555|-1|localhost:13660;localhost:13560,localhost:15560
450 newrt|end
451 `
452
453         //---------------------------------
454         //
455         //---------------------------------
456         xapp.Logger.Info("### submgr main run ###")
457
458         subsrt := allrt
459         /*
460                 subsrt := `newrt|start
461            mse|12010|-1|localhost:14560
462            mse|12010,localhost:14560|-1|localhost:15560
463            mse|12011,localhost:15560|-1|localhost:14560
464            mse|12011|-1|localhost:13560;localhost:13660
465            mse|12012,localhost:15560|-1|localhost:14560
466            mse|12012|-1|localhost:13560;localhost:13660
467            mse|12020|-1|localhost:14560
468            mse|12020,localhost:14560|-1|localhost:15560
469            mse|12021,localhost:15560|-1|localhost:14560
470            mse|12021|-1|localhost:13560;localhost:13660
471            mse|12022,localhost:15560|-1|localhost:14560
472            mse|12022|-1|localhost:13560;localhost:13660
473            newrt|end
474            `
475         */
476
477         subrtfilename, _ := testCreateTmpFile(subsrt)
478         defer os.Remove(subrtfilename)
479         mainCtrl = createNewMainControl("main", subrtfilename, "14560")
480
481         //---------------------------------
482         //
483         //---------------------------------
484         xapp.Logger.Info("### xapp1 rmr run ###")
485
486         xapprt1 := allrt
487         /*
488                 xapprt1 := `newrt|start
489            mse|12010|-1|localhost:14560
490            mse|12011|-1|localhost:13560
491            mse|12012|-1|localhost:13560
492            mse|12020|-1|localhost:14560
493            mse|12021|-1|localhost:13560
494            mse|12022|-1|localhost:13560
495            newrt|end
496            `
497         */
498
499         xapprtfilename1, _ := testCreateTmpFile(xapprt1)
500         defer os.Remove(xapprtfilename1)
501         xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB")
502
503         //---------------------------------
504         //
505         //---------------------------------
506
507         xapp.Logger.Info("### xapp2 rmr run ###")
508
509         xapprt2 := allrt
510         /*
511                 xapprt2 := `newrt|start
512            mse|12010|-1|localhost:14560
513            mse|12011|-1|localhost:13660
514            mse|12012|-1|localhost:13660
515            mse|12020|-1|localhost:14560
516            mse|12021|-1|localhost:13660
517            mse|12022|-1|localhost:13660
518            newrt|end
519            `
520         */
521
522         xapprtfilename2, _ := testCreateTmpFile(xapprt2)
523         defer os.Remove(xapprtfilename2)
524         xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB")
525
526         //---------------------------------
527         //
528         //---------------------------------
529         xapp.Logger.Info("### e2term rmr run ###")
530
531         e2termrt := allrt
532         /*
533                 e2termrt := `newrt|start
534            mse|12010|-1|localhost:15560
535            mse|12011|-1|localhost:14560
536            mse|12012|-1|localhost:14560
537            mse|12020|-1|localhost:15560
538            mse|12021|-1|localhost:14560
539            mse|12022|-1|localhost:14560
540            newrt|end
541            `
542         */
543
544         e2termrtfilename, _ := testCreateTmpFile(e2termrt)
545         defer os.Remove(e2termrtfilename)
546         e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
547
548         //---------------------------------
549         // Testing message sending
550         //---------------------------------
551         var dummyBuf []byte = make([]byte, 100)
552
553         params := &RMRParams{&xapp.RMRParams{}}
554         params.Mtype = 55555
555         params.SubId = -1
556         params.Payload = dummyBuf
557         params.PayloadLen = 100
558         params.Meid = &xapp.RMRMeid{RanName: "NONEXISTINGRAN"}
559         params.Xid = "THISISTESTFORSTUBS"
560         params.Mbuf = nil
561
562         status := false
563         i := 1
564         for ; i <= 10 && status == false; i++ {
565                 xapp.Rmr.Send(params.RMRParams, false)
566                 if e2termConn.active == true && xappConn1.active == true && xappConn2.active == true {
567                         status = true
568                         break
569                 } else {
570                         xapp.Logger.Info("Sleep 0.5 secs and try routes again")
571                         time.Sleep(500 * time.Millisecond)
572                 }
573         }
574
575         if status == false {
576                 xapp.Logger.Error("Could not initialize routes")
577                 os.Exit(1)
578         }
579
580         //---------------------------------
581         //
582         //---------------------------------
583         code := m.Run()
584         os.Exit(code)
585 }