// Id -> Command
var configuration = make(map[string]*models.JsonCommand)
+
// Rmr Message Id -> Command
var waitForRmrMessageType = make(map[int]*models.JsonCommand)
}
type Dispatcher struct {
- rmrService *rmr.Service
- processResult models.ProcessResult
+ rmrService *rmr.Service
+ processResult models.ProcessResult
}
func (d *Dispatcher) GetProcessResult() models.ProcessResult {
func New(rmrService *rmr.Service) *Dispatcher {
return &Dispatcher{
- rmrService: rmrService,
+ rmrService: rmrService,
}
}
if err != nil {
log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
d.processResult.Err = err
- d.processResult.Stats.SentErrorCount++
+ d.processResult.Stats.SentErrorCount.Inc()
return err
}
- d.processResult.Stats.SentCount++
+ d.processResult.Stats.SentCount.Inc()
return nil
}
if err != nil {
log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
- d.processResult.Stats.SentErrorCount++
+ d.processResult.Stats.SentErrorCount.Inc()
continue
}
- d.processResult.Stats.SentCount++
+ d.processResult.Stats.SentCount.Inc()
time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
}
}
sendAndReceiveWg.Wait()
}
+func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
+ responseCommand, ok := configuration[command.SendCommandId]
+
+ if !ok {
+ return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
+ }
+
+ return responseCommand, nil
+}
+
func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
for {
select {
mbuf, err := d.rmrService.RecvMessage()
if err != nil {
+ log.Printf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
d.processResult.Err = err
- d.processResult.Stats.ReceivedErrorCount++
+ d.processResult.Stats.ReceivedErrorCount.Inc()
return
}
+ messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
- d.processResult.Stats.ReceivedUnexpectedCount++
+ log.Printf("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedUnexpectedCount.Inc()
continue
}
- log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
- d.processResult.Stats.ReceivedExpectedCount++
+ log.Printf("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedExpectedCount.Inc()
if len(command.SendCommandId) > 0 {
- responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
+ responseCommand, err := getResponseCommand(command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
_ = d.sendNoRepeat(*responseCommand)
}
}
func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
- for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
+
+ var responseCommand *models.JsonCommand
+
+ if len(command.SendCommandId) > 0 {
+ var err error
+ responseCommand, err = getResponseCommand(command)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+ }
+
+ for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
select {
case <-ctx.Done():
return
if err != nil {
log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
- d.processResult.Stats.ReceivedErrorCount++
+ d.processResult.Stats.ReceivedErrorCount.Inc()
continue
}
+ messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
- d.processResult.Stats.ReceivedUnexpectedCount++
+ log.Printf("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedUnexpectedCount.Inc()
continue
}
- log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
- d.processResult.Stats.ReceivedExpectedCount++
+ log.Printf("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+ d.processResult.Stats.ReceivedExpectedCount.Inc()
- if len(command.SendCommandId) > 0 {
- responseCommand := configuration[command.SendCommandId]
+ if responseCommand != nil {
_ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
}
}
go 1.12
-require github.com/pkg/errors v0.8.1
+require (
+ github.com/pkg/errors v0.8.1
+ go.uber.org/atomic v1.5.0
+)
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
+go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
--- /dev/null
+//
+// Copyright 2019 AT&T Intellectual Property
+// Copyright 2019 Nokia
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package models
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+)
+
+// TODO: message command id / source / dest
+
+type MessageInfo struct {
+ MessageTimestamp int64 `json:"messageTimestamp"`
+ MessageType int `json:"messageType"`
+ Meid string `json:"meid"`
+ Payload string `json:"payload"`
+ TransactionId string `json:"transactionId"`
+}
+
+func GetMessageInfoAsJson(messageType int, meid string, payload []byte, transactionId []byte) string {
+ messageInfo := MessageInfo{
+ MessageTimestamp: time.Now().Unix(),
+ MessageType: messageType,
+ Meid: meid,
+ Payload: fmt.Sprintf("%x", payload),
+ TransactionId: string(transactionId),
+ }
+
+ jsonData, _ := json.Marshal(messageInfo)
+
+ return string(jsonData)
+}
package models
-import "fmt"
+import (
+ "fmt"
+ "go.uber.org/atomic"
+)
type ProcessStats struct {
- SentCount int
- SentErrorCount int
- ReceivedExpectedCount int
- ReceivedUnexpectedCount int
- ReceivedErrorCount int
+ SentCount atomic.Int32
+ SentErrorCount atomic.Int32
+ ReceivedExpectedCount atomic.Int32
+ ReceivedUnexpectedCount atomic.Int32
+ ReceivedErrorCount atomic.Int32
}
type ProcessResult struct {
}
func (pr ProcessResult) String() string {
- return fmt.Sprintf("\nNumber of sent messages: %d\nNumber of send errors: %d\n" +
- "Number of expected received messages: %d\nNumber of unexpected received messages: %d\n" +
+ return fmt.Sprintf("\nNumber of sent messages: %d\nNumber of send errors: %d\n"+
+ "Number of expected received messages: %d\nNumber of unexpected received messages: %d\n"+
"Number of receive errors: %d\n", pr.Stats.SentCount, pr.Stats.SentErrorCount, pr.Stats.ReceivedExpectedCount, pr.Stats.ReceivedUnexpectedCount, pr.Stats.ReceivedErrorCount)
}
"payloadHeader": "$ranIp|$ranPort|$ranName|#packedPayload|",
"packedPayload": "0006002a000002001500080013302300fffff000140017000001f700133023fffff0000000133023000000000001"
},
+ {
+ "id": "RIC_ENDC_SETUP_REQ",
+ "action": "send",
+ "rmrMessageType": "10360",
+ "transactionId": "e2e$",
+ "payloadHeader": "$ranIp|$ranPort|$ranName|#packedPayload|",
+ "packedPayload": ""
+ },
+ {
+ "id": "RIC_ENDC_SETUP_RESPONSE",
+ "rmrMessageType": "10361",
+ "packedPayload": ""
+ },
{
"id": "RIC_ENB_CONF_UPDATE_ACK_positive",
"sendRmrMessageType": "10081",
"id": "RIC_SUBSCRIPTION_FAILURE",
"rmrMessageType": "12012",
"transactionId": "e2e$",
- "packedPayload":"40c9001f000003ea7e00050000010001ea6300020000ea6e000908ea6f400400014100"
+ "packedPayload": "40c9001f000003ea7e00050000010001ea6300020000ea6e000908ea6f400400014100"
},
{
"id": "RIC_SUBSCRIPTION_DELETE_REQ",
"packedPayload": "2006002a000002001500080002f82900007ab000140017000000630002f8290007ab00102002f829000001000133"
},
{
- "id": "RIC_INDICATION_REQ",
- "rmrMessageType": "",
+ "id": "RIC_INDICATION",
+ "rmrMessageType": "12050"
+ },
+ {
+ "id": "RESOURCE_STATUS_REQUEST",
+ "rmrMessageType": "10090",
+ "transactionId": "e2e$",
+ "packedPayload": "0009003c0000080027000300000e001c00010000260004fe000000001d400d00001f4008000a0b0cabcd8000001e4001000040400100006d4001000091400100"
+ },
+ {
+ "id": "RESOURCE_STATUS_RESPONSE",
+ "rmrMessageType": "10091",
"packedPayload": ""
+ },
+ {
+ "id": "RESOURCE_STATUS_UPDATE",
+ "rmrMessageType": "10090"
+ },
+ {
+ "id": "LOAD_INFORMATION",
+ "rmrMessageType": "10020"
}
]
package rmr
import (
- "log"
"strconv"
)
}
func (r *Service) SendMessage(messageType int, meid string, msg []byte, transactionId []byte) (*MBuf, error) {
- log.Printf("#rmr.Service.SendMessage - type: %d, tid: %s, msg: %v", messageType, transactionId, msg)
mbuf := NewMBuf(messageType, len(msg), msg, transactionId)
mbuf.Meid = meid
return (*r.messenger).SendMsg(mbuf)
if err != nil {
return errors.New(fmt.Sprintf("invalid rmr message id: %s", command.RmrMessageType))
}
- _, err = r.SendMessage(int(rmrMsgId), command.Meid, append([]byte(command.PayloadHeader), payload...), []byte(command.TransactionId))
+
+ msg := append([]byte(command.PayloadHeader), payload...)
+ messageInfo := models.GetMessageInfoAsJson(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
+ log.Printf("#rmr.Service.SendMessage - %s", messageInfo)
+
+ _, err = r.SendMessage(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
return err
}