[RICPLT-1528] Add atomic counters | Add commands to config | edit logging 67/1467/1
authoris005q <idan.shalom@intl.att.com>
Mon, 11 Nov 2019 10:59:49 +0000 (12:59 +0200)
committeris005q <idan.shalom@intl.att.com>
Mon, 11 Nov 2019 10:59:54 +0000 (12:59 +0200)
Change-Id: I1aa71c428d0b7028b377b417cfc6e1b826fc893a
Signed-off-by: is005q <idan.shalom@intl.att.com>
tools/xappmock/dispatcher/dispatcher.go
tools/xappmock/go.mod
tools/xappmock/go.sum
tools/xappmock/models/message_info.go [new file with mode: 0644]
tools/xappmock/models/process_result.go
tools/xappmock/resources/conf/configuration.json
tools/xappmock/rmr/rmrEndPoint.go
tools/xappmock/sender/jsonSender.go

index 7243cde..bd75e7b 100644 (file)
@@ -16,6 +16,7 @@ import (
 
 // Id -> Command
 var configuration = make(map[string]*models.JsonCommand)
+
 // Rmr Message Id -> Command
 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
 
@@ -31,8 +32,8 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma
 }
 
 type Dispatcher struct {
-       rmrService              *rmr.Service
-       processResult           models.ProcessResult
+       rmrService    *rmr.Service
+       processResult models.ProcessResult
 }
 
 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
@@ -41,7 +42,7 @@ func (d *Dispatcher) GetProcessResult() models.ProcessResult {
 
 func New(rmrService *rmr.Service) *Dispatcher {
        return &Dispatcher{
-               rmrService:              rmrService,
+               rmrService: rmrService,
        }
 }
 
@@ -65,11 +66,11 @@ func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
        if err != nil {
                log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
                d.processResult.Err = err
-               d.processResult.Stats.SentErrorCount++
+               d.processResult.Stats.SentErrorCount.Inc()
                return err
        }
 
-       d.processResult.Stats.SentCount++
+       d.processResult.Stats.SentCount.Inc()
        return nil
 }
 
@@ -86,11 +87,11 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm
 
                if err != nil {
                        log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
-                       d.processResult.Stats.SentErrorCount++
+                       d.processResult.Stats.SentErrorCount.Inc()
                        continue
                }
 
-               d.processResult.Stats.SentCount++
+               d.processResult.Stats.SentCount.Inc()
                time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
        }
 }
@@ -212,6 +213,16 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom
        sendAndReceiveWg.Wait()
 }
 
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+       responseCommand, ok := configuration[command.SendCommandId]
+
+       if !ok {
+               return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+       }
+
+       return responseCommand, nil
+}
+
 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
        for {
                select {
@@ -223,24 +234,33 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
+                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
                        d.processResult.Err = err
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        return
                }
 
+               messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               log.Printf("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
 
                if len(command.SendCommandId) > 0 {
-                       responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
+                       responseCommand, err := getResponseCommand(command)
+
+                       if err != nil {
+                               d.processResult.Err = err
+                               return
+                       }
+
                        _ = d.sendNoRepeat(*responseCommand)
                }
 
@@ -249,7 +269,20 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models
 }
 
 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
-       for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
+
+       var responseCommand *models.JsonCommand
+
+       if len(command.SendCommandId) > 0 {
+               var err error
+               responseCommand, err = getResponseCommand(command)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+       }
+
+       for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
                select {
                case <-ctx.Done():
                        return
@@ -260,23 +293,24 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command mode
 
                if err != nil {
                        log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
-                       d.processResult.Stats.ReceivedErrorCount++
+                       d.processResult.Stats.ReceivedErrorCount.Inc()
                        continue
                }
 
+               messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
-                       d.processResult.Stats.ReceivedUnexpectedCount++
+                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+                       d.processResult.Stats.ReceivedUnexpectedCount.Inc()
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
-               d.processResult.Stats.ReceivedExpectedCount++
+               log.Printf("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+               d.processResult.Stats.ReceivedExpectedCount.Inc()
 
-               if len(command.SendCommandId) > 0 {
-                       responseCommand := configuration[command.SendCommandId]
+               if responseCommand != nil {
                        _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
                }
        }
index f870a4c..72c7ae1 100644 (file)
@@ -2,4 +2,7 @@ module xappmock
 
 go 1.12
 
-require github.com/pkg/errors v0.8.1
+require (
+       github.com/pkg/errors v0.8.1
+       go.uber.org/atomic v1.5.0
+)
index f29ab35..766a895 100644 (file)
@@ -1,2 +1,19 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
+go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/tools/xappmock/models/message_info.go b/tools/xappmock/models/message_info.go
new file mode 100644 (file)
index 0000000..f1c65e0
--- /dev/null
@@ -0,0 +1,48 @@
+//
+// Copyright 2019 AT&T Intellectual Property
+// Copyright 2019 Nokia
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package models
+
+import (
+       "encoding/json"
+       "fmt"
+       "time"
+)
+
+// TODO: message command id / source / dest
+
+type MessageInfo struct {
+       MessageTimestamp int64  `json:"messageTimestamp"`
+       MessageType      int    `json:"messageType"`
+       Meid             string `json:"meid"`
+       Payload          string `json:"payload"`
+       TransactionId    string `json:"transactionId"`
+}
+
+func GetMessageInfoAsJson(messageType int, meid string, payload []byte, transactionId []byte) string {
+       messageInfo := MessageInfo{
+               MessageTimestamp: time.Now().Unix(),
+               MessageType:      messageType,
+               Meid:             meid,
+               Payload:          fmt.Sprintf("%x", payload),
+               TransactionId:    string(transactionId),
+       }
+
+       jsonData, _ := json.Marshal(messageInfo)
+
+       return string(jsonData)
+}
index eb9543f..9ceede7 100644 (file)
 
 package models
 
-import "fmt"
+import (
+       "fmt"
+       "go.uber.org/atomic"
+)
 
 type ProcessStats struct {
-       SentCount               int
-       SentErrorCount          int
-       ReceivedExpectedCount   int
-       ReceivedUnexpectedCount int
-       ReceivedErrorCount      int
+       SentCount               atomic.Int32
+       SentErrorCount          atomic.Int32
+       ReceivedExpectedCount   atomic.Int32
+       ReceivedUnexpectedCount atomic.Int32
+       ReceivedErrorCount      atomic.Int32
 }
 
 type ProcessResult struct {
@@ -33,7 +36,7 @@ type ProcessResult struct {
 }
 
 func (pr ProcessResult) String() string {
-       return fmt.Sprintf("\nNumber of sent messages: %d\nNumber of send errors: %d\n" +
-               "Number of expected received messages: %d\nNumber of unexpected received messages: %d\n" +
+       return fmt.Sprintf("\nNumber of sent messages: %d\nNumber of send errors: %d\n"+
+               "Number of expected received messages: %d\nNumber of unexpected received messages: %d\n"+
                "Number of receive errors: %d\n", pr.Stats.SentCount, pr.Stats.SentErrorCount, pr.Stats.ReceivedExpectedCount, pr.Stats.ReceivedUnexpectedCount, pr.Stats.ReceivedErrorCount)
 }
index 1bdcb2a..bc55dfc 100644 (file)
@@ -7,6 +7,19 @@
     "payloadHeader": "$ranIp|$ranPort|$ranName|#packedPayload|",
     "packedPayload": "0006002a000002001500080013302300fffff000140017000001f700133023fffff0000000133023000000000001"
   },
+  {
+    "id": "RIC_ENDC_SETUP_REQ",
+    "action": "send",
+    "rmrMessageType": "10360",
+    "transactionId": "e2e$",
+    "payloadHeader": "$ranIp|$ranPort|$ranName|#packedPayload|",
+    "packedPayload": ""
+  },
+  {
+    "id": "RIC_ENDC_SETUP_RESPONSE",
+    "rmrMessageType": "10361",
+    "packedPayload": ""
+  },
   {
     "id": "RIC_ENB_CONF_UPDATE_ACK_positive",
     "sendRmrMessageType": "10081",
@@ -29,7 +42,7 @@
     "id": "RIC_SUBSCRIPTION_FAILURE",
     "rmrMessageType": "12012",
     "transactionId": "e2e$",
-    "packedPayload":"40c9001f000003ea7e00050000010001ea6300020000ea6e000908ea6f400400014100"
+    "packedPayload": "40c9001f000003ea7e00050000010001ea6300020000ea6e000908ea6f400400014100"
   },
   {
     "id": "RIC_SUBSCRIPTION_DELETE_REQ",
     "packedPayload": "2006002a000002001500080002f82900007ab000140017000000630002f8290007ab00102002f829000001000133"
   },
   {
-    "id": "RIC_INDICATION_REQ",
-    "rmrMessageType": "",
+    "id": "RIC_INDICATION",
+    "rmrMessageType": "12050"
+  },
+  {
+    "id": "RESOURCE_STATUS_REQUEST",
+    "rmrMessageType": "10090",
+    "transactionId": "e2e$",
+    "packedPayload": "0009003c0000080027000300000e001c00010000260004fe000000001d400d00001f4008000a0b0cabcd8000001e4001000040400100006d4001000091400100"
+  },
+  {
+    "id": "RESOURCE_STATUS_RESPONSE",
+    "rmrMessageType": "10091",
     "packedPayload": ""
+  },
+  {
+    "id": "RESOURCE_STATUS_UPDATE",
+    "rmrMessageType": "10090"
+  },
+  {
+    "id": "LOAD_INFORMATION",
+    "rmrMessageType": "10020"
   }
 ]
 
index fa9cceb..957d4f0 100644 (file)
@@ -17,7 +17,6 @@
 package rmr
 
 import (
-       "log"
        "strconv"
 )
 
@@ -34,7 +33,6 @@ func NewService(rmrConfig Config, messenger Messenger) *Service {
 }
 
 func (r *Service) SendMessage(messageType int, meid string, msg []byte, transactionId []byte) (*MBuf, error) {
-       log.Printf("#rmr.Service.SendMessage - type: %d, tid: %s, msg: %v", messageType, transactionId, msg)
        mbuf := NewMBuf(messageType, len(msg), msg, transactionId)
        mbuf.Meid = meid
        return (*r.messenger).SendMsg(mbuf)
index b58e441..1af7643 100644 (file)
@@ -49,7 +49,12 @@ func SendJsonRmrMessage(command models.JsonCommand /*the copy is modified locall
        if err != nil {
                return errors.New(fmt.Sprintf("invalid rmr message id: %s", command.RmrMessageType))
        }
-       _, err = r.SendMessage(int(rmrMsgId), command.Meid, append([]byte(command.PayloadHeader), payload...), []byte(command.TransactionId))
+
+       msg := append([]byte(command.PayloadHeader), payload...)
+       messageInfo := models.GetMessageInfoAsJson(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
+       log.Printf("#rmr.Service.SendMessage - %s", messageInfo)
+
+       _, err = r.SendMessage(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
        return err
 }