69dadff13bf1bf8d6368e96ec4eee859ae14807c
[ric-plt/submgr.git] / pkg / control / main_test.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package control
21
22 import (
23         "encoding/json"
24         "errors"
25         "fmt"
26         "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
27         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28         "io/ioutil"
29         "net/http"
30         "os"
31         "strconv"
32         "strings"
33         "testing"
34         "time"
35 )
36
37 //-----------------------------------------------------------------------------
38 //
39 //-----------------------------------------------------------------------------
40 type testingControl struct {
41         desc     string
42         syncChan chan struct{}
43 }
44
45 func (tc *testingControl) ReadyCB(data interface{}) {
46         xapp.Logger.Info("testingControl(%s) ReadyCB", tc.desc)
47         tc.syncChan <- struct{}{}
48         return
49 }
50
51 func (tc *testingControl) WaitCB() {
52         <-tc.syncChan
53 }
54
55 func initTestingControl(desc string, rtfile string, port string) testingControl {
56         tc := testingControl{}
57         os.Setenv("RMR_SEED_RT", rtfile)
58         os.Setenv("RMR_SRC_ID", "localhost:"+port)
59         xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
60         xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
61         tc.desc = strings.ToUpper(desc)
62         tc.syncChan = make(chan struct{})
63         return tc
64 }
65
66 //-----------------------------------------------------------------------------
67 //
68 //-----------------------------------------------------------------------------
69 type testingRmrControl struct {
70         testingControl
71         rmrClientTest *xapp.RMRClient
72 }
73
74 func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
75         //
76         //NOTE: Do this way until xapp-frame sending is improved
77         //
78         status := false
79         i := 1
80         for ; i <= 10 && status == false; i++ {
81                 status = tc.rmrClientTest.SendMsg(params)
82                 if status == false {
83                         xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
84                         time.Sleep(500 * time.Millisecond)
85                 }
86         }
87         if status == false {
88                 err = errors.New("rmr.Send() failed")
89                 tc.rmrClientTest.Free(params.Mbuf)
90         }
91         return
92 }
93
94 func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
95         tc := testingRmrControl{}
96         tc.testingControl = initTestingControl(desc, rtfile, port)
97         tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
98         tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
99         go tc.rmrClientTest.Start(consumer)
100         tc.WaitCB()
101         return tc
102 }
103
104 //-----------------------------------------------------------------------------
105 //
106 //-----------------------------------------------------------------------------
107 type testingMessageChannel struct {
108         rmrConChan chan *xapp.RMRParams
109 }
110
111 func initTestingMessageChannel() testingMessageChannel {
112         mc := testingMessageChannel{}
113         mc.rmrConChan = make(chan *xapp.RMRParams)
114         return mc
115 }
116
117 //-----------------------------------------------------------------------------
118 //
119 //-----------------------------------------------------------------------------
120 //-----------------------------------------------------------------------------
121 //
122 //-----------------------------------------------------------------------------
123 type xappTransaction struct {
124         tc   *testingXappControl
125         xid  string
126         meid *xapp.RMRMeid
127 }
128
129 type testingXappControl struct {
130         testingRmrControl
131         testingMessageChannel
132         xid_seq uint64
133 }
134
135 func (tc *testingXappControl) newXid() string {
136         var xid string
137         xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
138         tc.xid_seq++
139         return xid
140 }
141
142 func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
143         trans := &xappTransaction{}
144         trans.tc = tc
145         if xid == nil {
146                 trans.xid = tc.newXid()
147         } else {
148                 trans.xid = *xid
149         }
150         trans.meid = &xapp.RMRMeid{RanName: ranname}
151         return trans
152 }
153
154 func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
155
156         if strings.Contains(msg.Xid, tc.desc) {
157                 xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
158                 tc.rmrConChan <- msg
159         } else {
160                 xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc)
161         }
162         return
163 }
164
165 func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl {
166         xappCtrl := &testingXappControl{}
167         xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
168         xappCtrl.testingMessageChannel = initTestingMessageChannel()
169         xappCtrl.xid_seq = 1
170         return xappCtrl
171 }
172
173 //-----------------------------------------------------------------------------
174 //
175 //-----------------------------------------------------------------------------
176 type testingE2termControl struct {
177         testingRmrControl
178         testingMessageChannel
179 }
180
181 func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) {
182         xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
183         tc.rmrConChan <- msg
184         return
185 }
186
187 func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
188         e2termCtrl := &testingE2termControl{}
189         e2termCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
190         e2termCtrl.testingMessageChannel = initTestingMessageChannel()
191         return e2termCtrl
192 }
193
194 //-----------------------------------------------------------------------------
195 //
196 //-----------------------------------------------------------------------------
197 type testingMainControl struct {
198         testingControl
199         c *Control
200 }
201
202 func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
203         mainCtrl = &testingMainControl{}
204         mainCtrl.testingControl = initTestingControl(desc, rtfile, port)
205         mainCtrl.c = NewControl()
206         xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
207         go xapp.RunWithParams(mainCtrl.c, false)
208         mainCtrl.WaitCB()
209         return mainCtrl
210 }
211
212 //-----------------------------------------------------------------------------
213 //
214 //-----------------------------------------------------------------------------
215
216 func testError(t *testing.T, pattern string, args ...interface{}) {
217         xapp.Logger.Error(fmt.Sprintf(pattern, args...))
218         t.Errorf(fmt.Sprintf(pattern, args...))
219 }
220
221 func testLog(t *testing.T, pattern string, args ...interface{}) {
222         xapp.Logger.Info(fmt.Sprintf(pattern, args...))
223         t.Logf(fmt.Sprintf(pattern, args...))
224 }
225
226 func testCreateTmpFile(str string) (string, error) {
227         file, err := ioutil.TempFile("/tmp", "*.rt")
228         if err != nil {
229                 return "", err
230         }
231         _, err = file.WriteString(str)
232         if err != nil {
233                 file.Close()
234                 return "", err
235         }
236         return file.Name(), nil
237 }
238
239 //-----------------------------------------------------------------------------
240 //
241 //-----------------------------------------------------------------------------
242
243 var xappConn1 *testingXappControl
244 var xappConn2 *testingXappControl
245 var e2termConn *testingE2termControl
246 var mainCtrl *testingMainControl
247
248 func TestMain(m *testing.M) {
249         xapp.Logger.Info("TestMain start")
250
251         //
252         //Cfg creation won't work like this as xapp-frame reads it during init.
253         //
254         /*
255             cfgstr:=`{
256               "local": {
257                   "host": ":8080"
258               },
259               "logger": {
260                   "level": 4
261               },
262               "rmr": {
263                  "protPort": "tcp:14560",
264                  "maxSize": 4096,
265                  "numWorkers": 1,
266                  "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
267                  "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
268               },
269               "db": {
270                   "host": "localhost",
271                   "port": 6379,
272                   "namespaces": ["sdl", "rnib"]
273               },
274                  "rtmgr" : {
275                    "HostAddr" : "localhost",
276                    "port" : "8989",
277                    "baseUrl" : "/"
278                  }
279            `
280
281            cfgfilename,_ := testCreateTmpFile(cfgstr)
282            defer os.Remove(cfgfilename)
283            os.Setenv("CFG_FILE", cfgfilename)
284         */
285         xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
286
287         //---------------------------------
288         // NOTE: Routing table is configured so, that responses
289         //       are duplicated to xapp1 and xapp2 instances.
290         //       If XID is not matching xapp stub will just
291         //       drop message. (Messages 12011, 12012, 12021, 12022)
292         //---------------------------------
293         xapp.Logger.Info("### submgr main run ###")
294
295         subsrt := `newrt|start
296 mse|12010|-1|localhost:14560
297 mse|12010,localhost:14560|-1|localhost:15560
298 mse|12011,localhost:15560|-1|localhost:14560
299 mse|12011|-1|localhost:13560;localhost:13660
300 mse|12012,localhost:15560|-1|localhost:14560
301 mse|12012|-1|localhost:13560;localhost:13660
302 mse|12020|-1|localhost:14560
303 mse|12020,localhost:14560|-1|localhost:15560
304 mse|12021,localhost:15560|-1|localhost:14560
305 mse|12021|-1|localhost:13560;localhost:13660
306 mse|12022,localhost:15560|-1|localhost:14560
307 mse|12022|-1|localhost:13560;localhost:13660
308 newrt|end
309 `
310         subrtfilename, _ := testCreateTmpFile(subsrt)
311         defer os.Remove(subrtfilename)
312         mainCtrl = createNewMainControl("main", subrtfilename, "14560")
313
314         //---------------------------------
315         //
316         //---------------------------------
317         xapp.Logger.Info("### xapp1 rmr run ###")
318
319         xapprt1 := `newrt|start
320 mse|12010|-1|localhost:14560
321 mse|12011|-1|localhost:13560
322 mse|12012|-1|localhost:13560
323 mse|12020|-1|localhost:14560
324 mse|12021|-1|localhost:13560
325 mse|12022|-1|localhost:13560
326 newrt|end
327 `
328
329         xapprtfilename1, _ := testCreateTmpFile(xapprt1)
330         defer os.Remove(xapprtfilename1)
331         xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB")
332
333         //---------------------------------
334         //
335         //---------------------------------
336
337         xapp.Logger.Info("### xapp2 rmr run ###")
338
339         xapprt2 := `newrt|start
340 mse|12010|-1|localhost:14560
341 mse|12011|-1|localhost:13660
342 mse|12012|-1|localhost:13660
343 mse|12020|-1|localhost:14560
344 mse|12021|-1|localhost:13660
345 mse|12022|-1|localhost:13660
346 newrt|end
347 `
348
349         xapprtfilename2, _ := testCreateTmpFile(xapprt2)
350         defer os.Remove(xapprtfilename2)
351         xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB")
352
353         //---------------------------------
354         //
355         //---------------------------------
356         xapp.Logger.Info("### e2term rmr run ###")
357
358         e2termrt := `newrt|start
359 mse|12010|-1|localhost:15560
360 mse|12011|-1|localhost:14560
361 mse|12012|-1|localhost:14560
362 mse|12020|-1|localhost:15560
363 mse|12021|-1|localhost:14560
364 mse|12022|-1|localhost:14560
365 newrt|end
366 `
367
368         e2termrtfilename, _ := testCreateTmpFile(e2termrt)
369         defer os.Remove(e2termrtfilename)
370         e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
371
372         //---------------------------------
373         //
374         //---------------------------------
375         http_handler := func(w http.ResponseWriter, r *http.Request) {
376                 var req rtmgr_models.XappSubscriptionData
377                 err := json.NewDecoder(r.Body).Decode(&req)
378                 if err != nil {
379                         xapp.Logger.Error("%s", err.Error())
380                 }
381                 xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
382
383                 w.WriteHeader(200)
384         }
385
386         go func() {
387                 http.HandleFunc("/", http_handler)
388                 http.ListenAndServe("localhost:8989", nil)
389         }()
390
391         //---------------------------------
392         //
393         //---------------------------------
394         code := m.Run()
395         os.Exit(code)
396 }