[RICPLT-1528] xApp Mock dev..... 03/1403/1
authoris005q <idan.shalom@intl.att.com>
Thu, 7 Nov 2019 13:49:10 +0000 (15:49 +0200)
committeris005q <idan.shalom@intl.att.com>
Thu, 7 Nov 2019 13:49:14 +0000 (15:49 +0200)
Change-Id: Idff3816ae23acfc12a2890cb017bbfd59c05fd0f
Signed-off-by: is005q <idan.shalom@intl.att.com>
tools/xappmock/dispatcher/dispatcher.go
tools/xappmock/models/json_command.go
tools/xappmock/resources/conf/configuration.json
tools/xappmock/resources/router.txt
tools/xappmock/sender/jsonSender.go

index 1bf65f1..7243cde 100644 (file)
@@ -31,8 +31,8 @@ func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonComma
 }
 
 type Dispatcher struct {
-       rmrService    *rmr.Service
-       processResult models.ProcessResult
+       rmrService              *rmr.Service
+       processResult           models.ProcessResult
 }
 
 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
@@ -41,7 +41,7 @@ func (d *Dispatcher) GetProcessResult() models.ProcessResult {
 
 func New(rmrService *rmr.Service) *Dispatcher {
        return &Dispatcher{
-               rmrService: rmrService,
+               rmrService:              rmrService,
        }
 }
 
@@ -50,26 +50,27 @@ func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
                return errors.New(fmt.Sprintf("invalid cmd, no id"))
        }
        configuration[cmd.Id] = &cmd
+       return nil
 
-       if len(cmd.ReceiveRmrMessageType) == 0 {
-               return nil
-       }
-
-       return addRmrMessageToWaitFor(cmd.ReceiveRmrMessageType, cmd)
+       //      if len(cmd.ReceiveCommandId) == 0 {
+       //              return nil
+       //      }
+       //
+       //      return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
 }
 
-func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) {
+func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
        err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
 
        if err != nil {
-               log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
+               log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
                d.processResult.Err = err
                d.processResult.Stats.SentErrorCount++
-               return
+               return err
        }
 
        d.processResult.Stats.SentCount++
-
+       return nil
 }
 
 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
@@ -84,7 +85,7 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm
                err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
 
                if err != nil {
-                       log.Printf("Dispatcher.sendHandler - error sending rmr message: %s", err)
+                       log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
                        d.processResult.Stats.SentErrorCount++
                        continue
                }
@@ -94,13 +95,34 @@ func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonComm
        }
 }
 
+func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
+       command, ok := configuration[receiveCommandId]
+
+       if !ok {
+               return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
+       }
+
+       if len(command.RmrMessageType) == 0 {
+               return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
+       }
+
+       return command.RmrMessageType, nil
+}
+
 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
 
        defer sendAndReceiveWg.Done()
        var listenAndHandleWg sync.WaitGroup
 
-       if len(command.ReceiveRmrMessageType) > 0 {
-               err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+       if len(command.ReceiveCommandId) > 0 {
+               rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
+
+               if err != nil {
+                       d.processResult.Err = err
+                       return
+               }
+
+               err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
 
                if err != nil {
                        d.processResult.Err = err
@@ -108,16 +130,21 @@ func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.Wai
                }
 
                listenAndHandleWg.Add(1)
-               go d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+               go d.listenAndHandle(ctx, &listenAndHandleWg, command)
        }
 
        if command.RepeatCount == 0 {
-               d.sendNoRepeat(command)
+               err := d.sendNoRepeat(command)
+
+               if err != nil {
+                       return
+               }
+
        } else {
                d.sendWithRepeat(ctx, command)
        }
 
-       if len(command.ReceiveRmrMessageType) > 0 {
+       if len(command.ReceiveCommandId) > 0 {
                listenAndHandleWg.Wait()
        }
 }
@@ -126,7 +153,7 @@ func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.
 
        defer sendAndReceiveWg.Done()
 
-       err := addRmrMessageToWaitFor(command.ReceiveRmrMessageType, command)
+       err := addRmrMessageToWaitFor(command.RmrMessageType, command)
 
        if err != nil {
                d.processResult.Err = err
@@ -135,7 +162,7 @@ func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.
 
        var listenAndHandleWg sync.WaitGroup
        listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
-       d.listenAndHandle(ctx, &listenAndHandleWg, command.RepeatCount)
+       d.listenAndHandle(ctx, &listenAndHandleWg, command)
 }
 
 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
@@ -185,7 +212,7 @@ func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCom
        sendAndReceiveWg.Wait()
 }
 
-func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
+func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
        for {
                select {
                case <-ctx.Done():
@@ -204,23 +231,25 @@ func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context) {
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
+                       log.Printf("#Dispatcher.listenAndHandleNoRepeat - Unexpected msg: %s", mbuf)
                        d.processResult.Stats.ReceivedUnexpectedCount++
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
+               log.Printf("#Dispatcher.listenAndHandleNoRepeat - expected msg: %s", mbuf)
                d.processResult.Stats.ReceivedExpectedCount++
-               return
-       }
-}
 
-func (d *Dispatcher) receive(ctx context.Context) {
+               if len(command.SendCommandId) > 0 {
+                       responseCommand := configuration[command.SendCommandId] // TODO: safe getResponseCommand
+                       _ = d.sendNoRepeat(*responseCommand)
+               }
 
+               return
+       }
 }
 
-func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount int) {
-       for d.processResult.Stats.ReceivedExpectedCount < repeatCount {
+func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
+       for d.processResult.Stats.ReceivedExpectedCount < command.RepeatCount {
                select {
                case <-ctx.Done():
                        return
@@ -230,7 +259,7 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount
                mbuf, err := d.rmrService.RecvMessage()
 
                if err != nil {
-                       log.Printf("#Dispatcher.listenAndHandle - error receiving message: %s", err)
+                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
                        d.processResult.Stats.ReceivedErrorCount++
                        continue
                }
@@ -238,26 +267,31 @@ func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, repeatCount
                _, ok := waitForRmrMessageType[mbuf.MType]
 
                if !ok {
-                       log.Printf("#Dispatcher.listenAndHandle - Unexpected msg: %s", mbuf)
+                       log.Printf("#Dispatcher.listenAndHandleWithRepeat - Unexpected msg: %s", mbuf)
                        d.processResult.Stats.ReceivedUnexpectedCount++
                        continue
                }
 
-               log.Printf("#Dispatcher.listenAndHandle - expected msg: %s", mbuf)
+               log.Printf("#Dispatcher.listenAndHandleWithRepeat - expected msg: %s", mbuf)
                d.processResult.Stats.ReceivedExpectedCount++
+
+               if len(command.SendCommandId) > 0 {
+                       responseCommand := configuration[command.SendCommandId]
+                       _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
+               }
        }
 }
 
-func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, repeatCount int) {
+func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
 
        defer listenAndHandleWg.Done()
 
-       if repeatCount == 0 {
-               d.listenAndHandleNoRepeat(ctx)
+       if command.RepeatCount == 0 {
+               d.listenAndHandleNoRepeat(ctx, command)
                return
        }
 
-       d.listenAndHandleWithRepeat(ctx, repeatCount)
+       d.listenAndHandleWithRepeat(ctx, command)
 }
 
 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
index beccbb2..382ee8d 100644 (file)
 package models
 
 type JsonCommand struct {
-       Id                    string
-       SendRmrMessageType    string
-       TransactionId         string
-       RanName               string
-       Meid                  string
-       RanIp                 string
-       RanPort               int
-       PayloadHeader         string
-       PackedPayload         string
-       Payload               string
-       Action                string
-       ReceiveRmrMessageType string
-       RepeatCount           int
-       RepeatDelayInMs       int
+       Id               string
+       RmrMessageType   string
+       SendCommandId    string
+       ReceiveCommandId string
+       TransactionId    string
+       RanName          string
+       Meid             string
+       RanIp            string
+       RanPort          int
+       PayloadHeader    string
+       PackedPayload    string
+       Payload          string
+       Action           string
+       RepeatCount      int
+       RepeatDelayInMs  int
 }
index 5b8eaf7..1bdcb2a 100644 (file)
@@ -1,7 +1,8 @@
 [
   {
     "id": "RIC_X2_SETUP_REQ",
-    "sendRmrMessageType": "10060",
+    "action": "send",
+    "rmrMessageType": "10060",
     "transactionId": "e2e$",
     "payloadHeader": "$ranIp|$ranPort|$ranName|#packedPayload|",
     "packedPayload": "0006002a000002001500080013302300fffff000140017000001f700133023fffff0000000133023000000000001"
     "transactionId": "e2e$",
     "packedPayload": "2025000a00000100f70003000000"
   },
+  {
+    "id": "RIC_SUBSCRIPTION_REQ",
+    "rmrMessageType": "12010",
+    "transactionId": "e2e$",
+    "packedPayload": "00c9002b000003ea7e0005004eec0182ea6300020001ea810015000a0011121300212224264000ea6b000420000000"
+  },
   {
     "id": "RIC_SUBSCRIPTION_RESPONSE",
-    "receiveRmrMessageType": "12011",
+    "rmrMessageType": "12011",
+    "transactionId": "e2e$",
+    "packedPayload": "20c9001d000003ea7e0005004eec0182ea6300020001ea6c000700ea6d40020000"
+  },
+  {
+    "id": "RIC_SUBSCRIPTION_FAILURE",
+    "rmrMessageType": "12012",
+    "transactionId": "e2e$",
+    "packedPayload":"40c9001f000003ea7e00050000010001ea6300020000ea6e000908ea6f400400014100"
+  },
+  {
+    "id": "RIC_SUBSCRIPTION_DELETE_REQ",
+    "rmrMessageType": "12020",
     "transactionId": "e2e$",
-    "packedPayload": "20c9001d000003ea7e000500abba0001ea6300020001ea6c000700ea6d40020000"
+    "packedPayload": "00ca0012000002ea7e0005004eec0182ea6300020001"
+  },
+  {
+    "id": "RIC_SUBSCRIPTION_DELETE_RESPONSE",
+    "rmrMessageType": "12021",
+    "transactionId": "e2e$",
+    "packedPayload": "20ca0012000002ea7e0005004eec0182ea6300020001"
+  },
+  {
+    "id": "RIC_X2_SETUP_RESPONSE",
+    "rmrMessageType": "10061",
+    "packedPayload": "2006002a000002001500080002f82900007ab000140017000000630002f8290007ab00102002f829000001000133"
   },
   {
-    "id": "RIC_X2_SETUP_RESPONSE"
+    "id": "RIC_INDICATION_REQ",
+    "rmrMessageType": "",
+    "packedPayload": ""
   }
 ]
+
index baf8eec..d013ab8 100644 (file)
@@ -21,4 +21,7 @@ rte|1200|10.0.2.15:4801
 rte|1210|10.0.2.15:4801
 rte|1220|10.0.2.15:4801
 rte|10090|10.0.2.15:38000
+rte|12010|10.0.2.15:38000
+rte|12011|10.0.2.15:5555
+rte|12020|10.0.2.15:38000
 newrt|end
index e71f3bb..b58e441 100644 (file)
@@ -45,9 +45,9 @@ func SendJsonRmrMessage(command models.JsonCommand /*the copy is modified locall
        }
        command.PayloadHeader = expandPayloadHeader(command.PayloadHeader, &command)
        log.Printf("#jsonSender.SendJsonRmrMessage - command payload header: %s", command.PayloadHeader)
-       rmrMsgId, err := rmr.MessageIdToUint(command.SendRmrMessageType)
+       rmrMsgId, err := rmr.MessageIdToUint(command.RmrMessageType)
        if err != nil {
-               return errors.New(fmt.Sprintf("invalid rmr message id: %s", command.SendRmrMessageType))
+               return errors.New(fmt.Sprintf("invalid rmr message id: %s", command.RmrMessageType))
        }
        _, err = r.SendMessage(int(rmrMsgId), command.Meid, append([]byte(command.PayloadHeader), payload...), []byte(command.TransactionId))
        return err