General capabilities for xapps 25/4525/2 v0.4.17
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Mon, 10 Aug 2020 05:20:22 +0000 (08:20 +0300)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Mon, 10 Aug 2020 05:30:40 +0000 (08:30 +0300)
- rmrendpoint helper structs
- rmr SendRetry to retry sending for given time (seconds).
  Tries every 500ms until given timeout is reached or message was successfully sent
- few String() functions

Change-Id: I6bb163d85bb35a5375cd0b053d8968ebb0042bc3
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
ci/Dockerfile
pkg/xapp/metrics.go
pkg/xapp/rmr.go
pkg/xapp/rmrendpoint.go [new file with mode: 0644]
pkg/xapp/rmrendpoint_test.go [new file with mode: 0644]
pkg/xapp/rmrendpointlist.go [new file with mode: 0644]
pkg/xapp/rmrendpointlist_test.go [new file with mode: 0644]
pkg/xapp/types.go

index 01e768d..a37b995 100755 (executable)
@@ -17,7 +17,7 @@
 #
 #----------------------------------------------------------
 
-FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:8-u18.04 as xapp-base
+FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:9-u18.04 as xapp-base
 RUN apt-get update -y \
     &&apt-get install -y \
     apt-utils \
index aa4b0f7..c943db6 100644 (file)
@@ -42,6 +42,18 @@ type MetricGroupsCache struct {
        Gauges   map[string]Gauge
 }
 
+func (met *MetricGroupsCache) CInc(metric string) {
+       met.Counters[metric].Inc()
+}
+
+func (met *MetricGroupsCache) CAdd(metric string, val float64) {
+       met.Counters[metric].Add(val)
+}
+
+func (met *MetricGroupsCache) GSet(metric string, val float64) {
+       met.Gauges[metric].Set(val)
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
index 90bc64b..9b91e1a 100755 (executable)
@@ -64,6 +64,8 @@ int wait_epoll(int epoll_fd,int rcv_fd) {
 import "C"
 
 import (
+       "bytes"
+       "crypto/md5"
        "fmt"
        "github.com/spf13/viper"
        "strings"
@@ -98,6 +100,9 @@ var RMRErrors = map[int]string{
        C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type RMRParams struct {
        Mtype      int
        Payload    []byte
@@ -113,6 +118,15 @@ type RMRParams struct {
        status     int
 }
 
+func (params *RMRParams) String() string {
+       var b bytes.Buffer
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
+       return b.String()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
        p := C.CString(protPort)
        m := C.int(maxSize)
@@ -268,6 +282,25 @@ func (m *RMRClient) SendRts(params *RMRParams) bool {
        return m.Send(params, true)
 }
 
+func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
+       status := m.Send(params, isRts)
+       i := 0
+       for ; i < int(to)*2 && status == false; i++ {
+               status = m.Send(params, isRts)
+               if status == false {
+                       time.Sleep(500 * time.Millisecond)
+               }
+       }
+       if status == false {
+               err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
+               if params.Mbuf != nil {
+                       m.Free(params.Mbuf)
+                       params.Mbuf = nil
+               }
+       }
+       return
+}
+
 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
        if params.Mbuf != nil {
                m.Free(params.Mbuf)
diff --git a/pkg/xapp/rmrendpoint.go b/pkg/xapp/rmrendpoint.go
new file mode 100644 (file)
index 0000000..cddcc4a
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+       "strconv"
+       "strings"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpoint struct {
+       Addr string // xapp addr
+       Port uint16 // xapp port
+}
+
+func (endpoint RmrEndpoint) String() string {
+       return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
+}
+
+func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool {
+       if (endpoint.Addr == ep.Addr) &&
+               (endpoint.Port == ep.Port) {
+               return true
+       }
+       return false
+}
+
+func (endpoint *RmrEndpoint) GetAddr() string {
+       return endpoint.Addr
+}
+
+func (endpoint *RmrEndpoint) GetPort() uint16 {
+       return endpoint.Port
+}
+
+func (endpoint *RmrEndpoint) Set(src string) bool {
+       elems := strings.Split(src, ":")
+       if len(elems) == 2 {
+               srcAddr := elems[0]
+               srcPort, err := strconv.ParseUint(elems[1], 10, 16)
+               if err == nil {
+                       endpoint.Addr = srcAddr
+                       endpoint.Port = uint16(srcPort)
+                       return true
+               }
+       }
+       return false
+}
+
+func NewRmrEndpoint(src string) *RmrEndpoint {
+       ep := &RmrEndpoint{}
+       if ep.Set(src) == false {
+               return nil
+       }
+       return ep
+}
diff --git a/pkg/xapp/rmrendpoint_test.go b/pkg/xapp/rmrendpoint_test.go
new file mode 100644 (file)
index 0000000..bea0f0b
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+       "testing"
+)
+
+func TestRmrEndpoint(t *testing.T) {
+       Logger.Info("CASE: TestRmrEndpoint")
+
+       testEp := func(t *testing.T, val string, expect *RmrEndpoint) {
+               res := NewRmrEndpoint(val)
+
+               if expect == nil && res == nil {
+                       return
+               }
+               if res == nil {
+                       t.Errorf("Endpoint elems for value %s expected addr %s port %d got nil", val, expect.GetAddr(), expect.GetPort())
+                       return
+               }
+               if expect.GetAddr() != res.GetAddr() || expect.GetPort() != res.GetPort() {
+                       t.Errorf("Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort())
+               }
+               if expect.String() != res.String() {
+                       t.Errorf("Endpoint string for value %s expected %s got %s", val, expect.String(), res.String())
+               }
+
+       }
+
+       testEp(t, "localhost:8080", &RmrEndpoint{"localhost", 8080})
+       testEp(t, "127.0.0.1:8080", &RmrEndpoint{"127.0.0.1", 8080})
+       testEp(t, "localhost:70000", nil)
+       testEp(t, "localhost?8080", nil)
+       testEp(t, "abcdefghijklmnopqrstuvwxyz", nil)
+       testEp(t, "", nil)
+}
diff --git a/pkg/xapp/rmrendpointlist.go b/pkg/xapp/rmrendpointlist.go
new file mode 100644 (file)
index 0000000..2afc004
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+       "strings"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpointList struct {
+       Endpoints []RmrEndpoint
+}
+
+func (eplist *RmrEndpointList) String() string {
+       valuesText := eplist.StringList()
+       return strings.Join(valuesText, ",")
+}
+
+func (eplist *RmrEndpointList) StringList() []string {
+       tmpList := eplist.Endpoints
+       valuesText := []string{}
+       for i := range tmpList {
+               valuesText = append(valuesText, tmpList[i].String())
+       }
+       return valuesText
+}
+
+func (eplist *RmrEndpointList) Size() int {
+       return len(eplist.Endpoints)
+}
+
+func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
+       for i := range eplist.Endpoints {
+               if eplist.Endpoints[i].Equal(ep) {
+                       return false
+               }
+       }
+       eplist.Endpoints = append(eplist.Endpoints, *ep)
+       return true
+}
+
+func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
+       for i := range eplist.Endpoints {
+               if eplist.Endpoints[i].Equal(ep) {
+                       eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1]
+                       eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0}
+                       eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1]
+                       return true
+               }
+       }
+       return false
+}
+
+func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
+       var retval bool = false
+       for i := range otheplist.Endpoints {
+               if eplist.DelEndpoint(&otheplist.Endpoints[i]) {
+                       retval = true
+               }
+       }
+       return retval
+}
+
+func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
+       for i := range eplist.Endpoints {
+               if eplist.Endpoints[i].Equal(ep) {
+                       return true
+               }
+       }
+       return false
+}
+
+func NewRmrEndpointList() *RmrEndpointList {
+       return &RmrEndpointList{}
+}
diff --git a/pkg/xapp/rmrendpointlist_test.go b/pkg/xapp/rmrendpointlist_test.go
new file mode 100644 (file)
index 0000000..cbef96e
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+       "testing"
+)
+
+func TestRmrEndpointList(t *testing.T) {
+       Logger.Info("CASE: TestRmrEndpointList")
+
+       epl := &RmrEndpointList{}
+
+       // Simple add / has / delete
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               t.Errorf("RmrEndpointList: 8080 add failed")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true {
+               t.Errorf("RmrEndpointList: 8080 duplicate add success")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               t.Errorf("RmrEndpointList: 8081 add failed")
+       }
+       if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               t.Errorf("RmrEndpointList: 8081 has failed")
+       }
+       if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               t.Errorf("RmrEndpointList: 8081 del failed")
+       }
+       if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+               t.Errorf("RmrEndpointList: 8081 has non existing success")
+       }
+       if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+               t.Errorf("RmrEndpointList: 8081 del non existing success")
+       }
+       if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               t.Errorf("RmrEndpointList: 8080 del failed")
+       }
+
+       // list delete
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               t.Errorf("RmrEndpointList: 8080 add failed")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               t.Errorf("RmrEndpointList: 8081 add failed")
+       }
+       if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+               t.Errorf("RmrEndpointList: 8082 add failed")
+       }
+
+       epl2 := &RmrEndpointList{}
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false {
+               t.Errorf("RmrEndpointList: othlist add 9080 failed")
+       }
+
+       if epl.DelEndpoints(epl2) == true {
+               t.Errorf("RmrEndpointList: delete list not existing successs")
+       }
+
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+               t.Errorf("RmrEndpointList: othlist add 8080 failed")
+       }
+       if epl.DelEndpoints(epl2) == false {
+               t.Errorf("RmrEndpointList: delete list 8080,9080 failed")
+       }
+
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+               t.Errorf("RmrEndpointList: othlist add 8081 failed")
+       }
+       if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+               t.Errorf("RmrEndpointList: othlist add 8082 failed")
+       }
+
+       if epl.DelEndpoints(epl2) == false {
+               t.Errorf("RmrEndpointList: delete list 8080,8081,8082,9080 failed")
+       }
+
+}
index f3c47eb..571dcba 100755 (executable)
@@ -26,6 +26,9 @@ import (
 // To be removed ...
 type RMRStatistics struct{}
 
+//
+//
+//
 type RMRClient struct {
        protPort      string
        contextMux    sync.Mutex
@@ -39,12 +42,46 @@ type RMRClient struct {
        readyCbParams interface{}
 }
 
+//
+//
+//
 type RMRMeid struct {
        PlmnID  string
        EnbID   string
        RanName string
 }
 
+func (meid *RMRMeid) String() string {
+       str := "meid("
+       pad := ""
+       if len(meid.PlmnID) > 0 {
+               str += pad + "PlmnID=" + meid.PlmnID
+               pad = " "
+       }
+       if len(meid.EnbID) > 0 {
+               str += pad + "EnbID=" + meid.EnbID
+               pad = " "
+       }
+       if len(meid.RanName) > 0 {
+               str += pad + "RanName=" + meid.RanName
+               pad = " "
+       }
+       str += ")"
+       return str
+}
+
+//
+//
+//
+type MessageConsumerFunc func(*RMRParams) error
+
+func (fn MessageConsumerFunc) Consume(params *RMRParams) error {
+       return fn(params)
+}
+
+//
+//
+//
 type MessageConsumer interface {
        Consume(params *RMRParams) error
 }