From 622bae3417eaaacfb798a82d530363f3c278e993 Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Mon, 10 Aug 2020 08:20:22 +0300 Subject: [PATCH] General capabilities for xapps - rmrendpoint helper structs - rmr SendRetry to retry sending for given time (seconds). Tries every 500ms until given timeout is reached or message was successfully sent - few String() functions Change-Id: I6bb163d85bb35a5375cd0b053d8968ebb0042bc3 Signed-off-by: Juha Hyttinen --- ci/Dockerfile | 2 +- pkg/xapp/metrics.go | 12 +++++ pkg/xapp/rmr.go | 33 ++++++++++++++ pkg/xapp/rmrendpoint.go | 75 +++++++++++++++++++++++++++++++ pkg/xapp/rmrendpoint_test.go | 54 +++++++++++++++++++++++ pkg/xapp/rmrendpointlist.go | 94 +++++++++++++++++++++++++++++++++++++++ pkg/xapp/rmrendpointlist_test.go | 95 ++++++++++++++++++++++++++++++++++++++++ pkg/xapp/types.go | 37 ++++++++++++++++ 8 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 pkg/xapp/rmrendpoint.go create mode 100644 pkg/xapp/rmrendpoint_test.go create mode 100644 pkg/xapp/rmrendpointlist.go create mode 100644 pkg/xapp/rmrendpointlist_test.go diff --git a/ci/Dockerfile b/ci/Dockerfile index 01e768d..a37b995 100755 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -17,7 +17,7 @@ # #---------------------------------------------------------- -FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:8-u18.04 as xapp-base +FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:9-u18.04 as xapp-base RUN apt-get update -y \ &&apt-get install -y \ apt-utils \ diff --git a/pkg/xapp/metrics.go b/pkg/xapp/metrics.go index aa4b0f7..c943db6 100644 --- a/pkg/xapp/metrics.go +++ b/pkg/xapp/metrics.go @@ -42,6 +42,18 @@ type MetricGroupsCache struct { Gauges map[string]Gauge } +func (met *MetricGroupsCache) CInc(metric string) { + met.Counters[metric].Inc() +} + +func (met *MetricGroupsCache) CAdd(metric string, val float64) { + met.Counters[metric].Add(val) +} + +func (met *MetricGroupsCache) GSet(metric string, val float64) { + met.Gauges[metric].Set(val) +} + //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go index 90bc64b..9b91e1a 100755 --- a/pkg/xapp/rmr.go +++ b/pkg/xapp/rmr.go @@ -64,6 +64,8 @@ int wait_epoll(int epoll_fd,int rcv_fd) { import "C" import ( + "bytes" + "crypto/md5" "fmt" "github.com/spf13/viper" "strings" @@ -98,6 +100,9 @@ var RMRErrors = map[int]string{ C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request", } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type RMRParams struct { Mtype int Payload []byte @@ -113,6 +118,15 @@ type RMRParams struct { status int } +func (params *RMRParams) String() string { + var b bytes.Buffer + fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload)) + return b.String() +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient { p := C.CString(protPort) m := C.int(maxSize) @@ -268,6 +282,25 @@ func (m *RMRClient) SendRts(params *RMRParams) bool { return m.Send(params, true) } +func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) { + status := m.Send(params, isRts) + i := 0 + for ; i < int(to)*2 && status == false; i++ { + status = m.Send(params, isRts) + if status == false { + time.Sleep(500 * time.Millisecond) + } + } + if status == false { + err = fmt.Errorf("Failed with retries(%d) %s", i, params.String()) + if params.Mbuf != nil { + m.Free(params.Mbuf) + params.Mbuf = nil + } + } + return +} + func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t { if params.Mbuf != nil { m.Free(params.Mbuf) diff --git a/pkg/xapp/rmrendpoint.go b/pkg/xapp/rmrendpoint.go new file mode 100644 index 0000000..cddcc4a --- /dev/null +++ b/pkg/xapp/rmrendpoint.go @@ -0,0 +1,75 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package xapp + +import ( + "strconv" + "strings" +) + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type RmrEndpoint struct { + Addr string // xapp addr + Port uint16 // xapp port +} + +func (endpoint RmrEndpoint) String() string { + return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10) +} + +func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool { + if (endpoint.Addr == ep.Addr) && + (endpoint.Port == ep.Port) { + return true + } + return false +} + +func (endpoint *RmrEndpoint) GetAddr() string { + return endpoint.Addr +} + +func (endpoint *RmrEndpoint) GetPort() uint16 { + return endpoint.Port +} + +func (endpoint *RmrEndpoint) Set(src string) bool { + elems := strings.Split(src, ":") + if len(elems) == 2 { + srcAddr := elems[0] + srcPort, err := strconv.ParseUint(elems[1], 10, 16) + if err == nil { + endpoint.Addr = srcAddr + endpoint.Port = uint16(srcPort) + return true + } + } + return false +} + +func NewRmrEndpoint(src string) *RmrEndpoint { + ep := &RmrEndpoint{} + if ep.Set(src) == false { + return nil + } + return ep +} diff --git a/pkg/xapp/rmrendpoint_test.go b/pkg/xapp/rmrendpoint_test.go new file mode 100644 index 0000000..bea0f0b --- /dev/null +++ b/pkg/xapp/rmrendpoint_test.go @@ -0,0 +1,54 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package xapp + +import ( + "testing" +) + +func TestRmrEndpoint(t *testing.T) { + Logger.Info("CASE: TestRmrEndpoint") + + testEp := func(t *testing.T, val string, expect *RmrEndpoint) { + res := NewRmrEndpoint(val) + + if expect == nil && res == nil { + return + } + if res == nil { + t.Errorf("Endpoint elems for value %s expected addr %s port %d got nil", val, expect.GetAddr(), expect.GetPort()) + return + } + if expect.GetAddr() != res.GetAddr() || expect.GetPort() != res.GetPort() { + t.Errorf("Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort()) + } + if expect.String() != res.String() { + t.Errorf("Endpoint string for value %s expected %s got %s", val, expect.String(), res.String()) + } + + } + + testEp(t, "localhost:8080", &RmrEndpoint{"localhost", 8080}) + testEp(t, "127.0.0.1:8080", &RmrEndpoint{"127.0.0.1", 8080}) + testEp(t, "localhost:70000", nil) + testEp(t, "localhost?8080", nil) + testEp(t, "abcdefghijklmnopqrstuvwxyz", nil) + testEp(t, "", nil) +} diff --git a/pkg/xapp/rmrendpointlist.go b/pkg/xapp/rmrendpointlist.go new file mode 100644 index 0000000..2afc004 --- /dev/null +++ b/pkg/xapp/rmrendpointlist.go @@ -0,0 +1,94 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package xapp + +import ( + "strings" +) + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type RmrEndpointList struct { + Endpoints []RmrEndpoint +} + +func (eplist *RmrEndpointList) String() string { + valuesText := eplist.StringList() + return strings.Join(valuesText, ",") +} + +func (eplist *RmrEndpointList) StringList() []string { + tmpList := eplist.Endpoints + valuesText := []string{} + for i := range tmpList { + valuesText = append(valuesText, tmpList[i].String()) + } + return valuesText +} + +func (eplist *RmrEndpointList) Size() int { + return len(eplist.Endpoints) +} + +func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool { + for i := range eplist.Endpoints { + if eplist.Endpoints[i].Equal(ep) { + return false + } + } + eplist.Endpoints = append(eplist.Endpoints, *ep) + return true +} + +func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool { + for i := range eplist.Endpoints { + if eplist.Endpoints[i].Equal(ep) { + eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1] + eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0} + eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1] + return true + } + } + return false +} + +func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool { + var retval bool = false + for i := range otheplist.Endpoints { + if eplist.DelEndpoint(&otheplist.Endpoints[i]) { + retval = true + } + } + return retval +} + +func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool { + for i := range eplist.Endpoints { + if eplist.Endpoints[i].Equal(ep) { + return true + } + } + return false +} + +func NewRmrEndpointList() *RmrEndpointList { + return &RmrEndpointList{} +} diff --git a/pkg/xapp/rmrendpointlist_test.go b/pkg/xapp/rmrendpointlist_test.go new file mode 100644 index 0000000..cbef96e --- /dev/null +++ b/pkg/xapp/rmrendpointlist_test.go @@ -0,0 +1,95 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package xapp + +import ( + "testing" +) + +func TestRmrEndpointList(t *testing.T) { + Logger.Info("CASE: TestRmrEndpointList") + + epl := &RmrEndpointList{} + + // Simple add / has / delete + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + t.Errorf("RmrEndpointList: 8080 add failed") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true { + t.Errorf("RmrEndpointList: 8080 duplicate add success") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + t.Errorf("RmrEndpointList: 8081 add failed") + } + if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + t.Errorf("RmrEndpointList: 8081 has failed") + } + if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + t.Errorf("RmrEndpointList: 8081 del failed") + } + if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true { + t.Errorf("RmrEndpointList: 8081 has non existing success") + } + if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true { + t.Errorf("RmrEndpointList: 8081 del non existing success") + } + if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + t.Errorf("RmrEndpointList: 8080 del failed") + } + + // list delete + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + t.Errorf("RmrEndpointList: 8080 add failed") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + t.Errorf("RmrEndpointList: 8081 add failed") + } + if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false { + t.Errorf("RmrEndpointList: 8082 add failed") + } + + epl2 := &RmrEndpointList{} + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false { + t.Errorf("RmrEndpointList: othlist add 9080 failed") + } + + if epl.DelEndpoints(epl2) == true { + t.Errorf("RmrEndpointList: delete list not existing successs") + } + + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false { + t.Errorf("RmrEndpointList: othlist add 8080 failed") + } + if epl.DelEndpoints(epl2) == false { + t.Errorf("RmrEndpointList: delete list 8080,9080 failed") + } + + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false { + t.Errorf("RmrEndpointList: othlist add 8081 failed") + } + if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false { + t.Errorf("RmrEndpointList: othlist add 8082 failed") + } + + if epl.DelEndpoints(epl2) == false { + t.Errorf("RmrEndpointList: delete list 8080,8081,8082,9080 failed") + } + +} diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go index f3c47eb..571dcba 100755 --- a/pkg/xapp/types.go +++ b/pkg/xapp/types.go @@ -26,6 +26,9 @@ import ( // To be removed ... type RMRStatistics struct{} +// +// +// type RMRClient struct { protPort string contextMux sync.Mutex @@ -39,12 +42,46 @@ type RMRClient struct { readyCbParams interface{} } +// +// +// type RMRMeid struct { PlmnID string EnbID string RanName string } +func (meid *RMRMeid) String() string { + str := "meid(" + pad := "" + if len(meid.PlmnID) > 0 { + str += pad + "PlmnID=" + meid.PlmnID + pad = " " + } + if len(meid.EnbID) > 0 { + str += pad + "EnbID=" + meid.EnbID + pad = " " + } + if len(meid.RanName) > 0 { + str += pad + "RanName=" + meid.RanName + pad = " " + } + str += ")" + return str +} + +// +// +// +type MessageConsumerFunc func(*RMRParams) error + +func (fn MessageConsumerFunc) Consume(params *RMRParams) error { + return fn(params) +} + +// +// +// type MessageConsumer interface { Consume(params *RMRParams) error } -- 2.16.6