}
type Dispatcher struct {
- rmrService *rmr.Service
- processResult models.ProcessResult
+ rmrService *rmr.Service
+ processResult models.ProcessResult
}
func (d *Dispatcher) GetProcessResult() models.ProcessResult {
func New(rmrService *rmr.Service) *Dispatcher {
return &Dispatcher{
- rmrService: rmrService,
+ rmrService: rmrService,
}
}
return errors.New(fmt.Sprintf("invalid cmd, no id"))
}
configuration[cmd.Id] = &cmd
+ return nil
- if len(cmd.ReceiveRmrMessageType) == 0 {
- return nil
- }
-
- return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd)
+ // if len(cmd.ReceiveCommandId) == 0 {
+ // return nil
+ // }
+ //
+ // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
}
-func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) {
+func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
if err != nil {
- log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
+ log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
d.processResult.Err = err
d.processResult.Stats.SentErrorCount++
- return
+ return err
}
d.processResult.Stats.SentCount++
-
+ return nil
}
func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
if err != nil {
- log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
+ log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
d.processResult.Stats.SentErrorCount++
continue
}
}
}
+func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
+ command, ok := configuration[receiveCommandId]
+
+ if !ok {
+ return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
+ }
+
+ if len(command.RmrMessageType) == 0 {
+ return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
+ }
+
+ return command.RmrMessageType, nil
+}
+
func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
defer sendAndReceiveWg.Done()
var listenAndHandleWg sync.WaitGroup
- if len(command.ReceiveRmrMessageType) > 0 {
- err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+ if len(command.ReceiveCommandId) > 0 {
+ rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
+
+ if err != nil {
+ d.processResult.Err = err
+ return
+ }
+
+ err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
if err != nil {
d.processResult.Err = err
}
listenAndHandleWg.Add(1)
- go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+ go d.listenAndHandle(ctx, &listenAndHandleWg, command)
}
if command.RepeatCount == 0 {
- d.sendNoRepeat(command)
+ err := d.sendNoRepeat(command)
+
+ if err != nil {
+ return
+ }
+
} else {
d.sendWithRepeat(ctx, command)
}
- if len(command.ReceiveRmrMessageType) > 0 {
+ if len(command.ReceiveCommandId) > 0 {
listenAndHandleWg.Wait()
}
}
defer sendAndReceiveWg.Done()
- err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+ err := addRmrMessageToWaitFor(command.RmrMessageType, command)
if err != nil {
d.processResult.Err = err
var listenAndHandleWg sync.WaitGroup
listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
- d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+ d.listenAndHandle(ctx, &listenAndHandleWg, command)
}
func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
sendAndReceiveWg.Wait()
}
-func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
+func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
for {
select {
case <-ctx.Done():
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
+ log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
d.processResult.Stats.ReceivedUnexpectedCount++
continue
}
- log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
+ log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
d.processResult.Stats.ReceivedExpectedCount++
- return
- }
-}
-func (d *Dispatcher) receive(ctx context.Context) {
+ if len(command.SendCommandId) > 0 {
+ responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
+ _ = d.sendNoRepeat(*responseCommand)
+ }
+ return
+ }
}
-func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) {
- for d.processResult.Stats.ReceivedExpectedCount < repeatCount {
+func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
+ for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
select {
case <-ctx.Done():
return
mbuf, err := d.rmrService.RecvMessage()
if err != nil {
- log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err)
+ log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
d.processResult.Stats.ReceivedErrorCount++
continue
}
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
+ log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
d.processResult.Stats.ReceivedUnexpectedCount++
continue
}
- log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
+ log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
d.processResult.Stats.ReceivedExpectedCount++
+
+ if len(command.SendCommandId) > 0 {
+ responseCommand := configuration[command.SendCommandId]
+ _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
+ }
}
}
-func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) {
+func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
defer listenAndHandleWg.Done()
- if repeatCount == 0 {
- d.listenAndHandleNoRepeat(ctx)
+ if command.RepeatCount == 0 {
+ d.listenAndHandleNoRepeat(ctx, command)
return
}
- d.listenAndHandleWithRepeat(ctx, repeatCount)
+ d.listenAndHandleWithRepeat(ctx, command)
}
func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
[
{
"id": "RIC_X2_SETUP_REQ",
- "sendRmrMessageType": "10060",
+ "action": "send",
+ "rmrMessageType": "10060",
"transactionId": "e2e$",
"payloadHeader": "$ranIp|$ranPort|$ranName|#packedPayload|",
"packedPayload": "0006002a000002001500080013302300fffff000140017000001f700133023fffff0000000133023000000000001"
"transactionId": "e2e$",
"packedPayload": "2025000a00000100f70003000000"
},
+ {
+ "id": "RIC_SUBSCRIPTION_REQ",
+ "rmrMessageType": "12010",
+ "transactionId": "e2e$",
+ "packedPayload": "00c9002b000003ea7e0005004eec0182ea6300020001ea810015000a0011121300212224264000ea6b000420000000"
+ },
{
"id": "RIC_SUBSCRIPTION_RESPONSE",
- "receiveRmrMessageType": "12011",
+ "rmrMessageType": "12011",
+ "transactionId": "e2e$",
+ "packedPayload": "20c9001d000003ea7e0005004eec0182ea6300020001ea6c000700ea6d40020000"
+ },
+ {
+ "id": "RIC_SUBSCRIPTION_FAILURE",
+ "rmrMessageType": "12012",
+ "transactionId": "e2e$",
+ "packedPayload":"40c9001f000003ea7e00050000010001ea6300020000ea6e000908ea6f400400014100"
+ },
+ {
+ "id": "RIC_SUBSCRIPTION_DELETE_REQ",
+ "rmrMessageType": "12020",
"transactionId": "e2e$",
- "packedPayload": "20c9001d000003ea7e000500abba0001ea6300020001ea6c000700ea6d40020000"
+ "packedPayload": "00ca0012000002ea7e0005004eec0182ea6300020001"
+ },
+ {
+ "id": "RIC_SUBSCRIPTION_DELETE_RESPONSE",
+ "rmrMessageType": "12021",
+ "transactionId": "e2e$",
+ "packedPayload": "20ca0012000002ea7e0005004eec0182ea6300020001"
+ },
+ {
+ "id": "RIC_X2_SETUP_RESPONSE",
+ "rmrMessageType": "10061",
+ "packedPayload": "2006002a000002001500080002f82900007ab000140017000000630002f8290007ab00102002f829000001000133"
},
{
- "id": "RIC_X2_SETUP_RESPONSE"
+ "id": "RIC_INDICATION_REQ",
+ "rmrMessageType": "",
+ "packedPayload": ""
}
]
+