"context"
"fmt"
"github.com/pkg/errors"
- "log"
"reflect"
"sync"
"time"
"xappmock/enums"
+ "xappmock/logger"
"xappmock/models"
"xappmock/rmr"
"xappmock/sender"
type Dispatcher struct {
rmrService *rmr.Service
processResult models.ProcessResult
+ logger *logger.Logger
+ jsonSender *sender.JsonSender
}
func (d *Dispatcher) GetProcessResult() models.ProcessResult {
return d.processResult
}
-func New(rmrService *rmr.Service) *Dispatcher {
+func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
return &Dispatcher{
rmrService: rmrService,
+ logger: logger,
+ jsonSender: jsonSender,
}
}
}
func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
- err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+
+ if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
if err != nil {
- log.Printf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
+ d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
d.processResult.Err = err
d.processResult.Stats.SentErrorCount.Inc()
return err
}
func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
+
+ if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
select {
default:
}
- err := sender.SendJsonRmrMessage(command, nil, d.rmrService)
+ err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
if err != nil {
- log.Printf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
+ d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
d.processResult.Stats.SentErrorCount.Inc()
continue
}
mbuf, err := d.rmrService.RecvMessage()
if err != nil {
- log.Printf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
+ d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
d.processResult.Err = err
d.processResult.Stats.ReceivedErrorCount.Inc()
return
}
- messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+ if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
+ d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
d.processResult.Stats.ReceivedUnexpectedCount.Inc()
continue
}
- log.Printf("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
+ d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
d.processResult.Stats.ReceivedExpectedCount.Inc()
if len(command.SendCommandId) > 0 {
mbuf, err := d.rmrService.RecvMessage()
if err != nil {
- log.Printf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
+ d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
d.processResult.Stats.ReceivedErrorCount.Inc()
continue
}
- messageInfo := models.GetMessageInfoAsJson(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
+ if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
+ now := time.Now()
+ d.processResult.StartTime = &now
+ }
+
+ messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
_, ok := waitForRmrMessageType[mbuf.MType]
if !ok {
- log.Printf("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
+ d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
d.processResult.Stats.ReceivedUnexpectedCount.Inc()
continue
}
- log.Printf("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
+ d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
d.processResult.Stats.ReceivedExpectedCount.Inc()
if responseCommand != nil {
require (
github.com/pkg/errors v0.8.1
go.uber.org/atomic v1.5.0
+ go.uber.org/zap v1.13.0
)
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
+go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
+go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
+go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
+go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU=
+go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
+golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
+honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
--- /dev/null
+//
+// Copyright 2019 AT&T Intellectual Property
+// Copyright 2019 Nokia
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package logger
+
+import (
+ "fmt"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+ "strings"
+ "time"
+)
+
+type Logger struct {
+ Logger *zap.Logger
+}
+
+// Copied from zap logger
+//
+// A Level is a logging priority. Higher levels are more important.
+type LogLevel int8
+
+const (
+ // DebugLevel logs are typically voluminous, and are usually disabled in
+ // production.
+ DebugLevel LogLevel = iota - 1
+ // InfoLevel is the default logging priority.
+ InfoLevel
+ // WarnLevel logs are more important than Info, but don't need individual
+ // human review.
+ WarnLevel
+ // ErrorLevel logs are high-priority. If an application is running smoothly,
+ // it shouldn't generate any error-level logs.
+ ErrorLevel
+ // DPanicLevel logs are particularly important errors. In development the
+ // logger panics after writing the message.
+ DPanicLevel
+ // PanicLevel logs a message, then panics.
+ PanicLevel
+ // FatalLevel logs a message, then calls os.Exit(1).
+ FatalLevel
+
+ _minLevel = DebugLevel
+ _maxLevel = FatalLevel
+)
+
+var logLevelTokenToLevel = map[string] LogLevel {
+ "debug" : DebugLevel,
+ "info": InfoLevel,
+ "warn": WarnLevel,
+ "error": ErrorLevel,
+ "dpanic": DPanicLevel,
+ "panic": PanicLevel,
+ "fatal": FatalLevel,
+}
+
+func LogLevelTokenToLevel(level string) (LogLevel, bool) {
+ if level, ok := logLevelTokenToLevel[strings.TrimSpace(strings.ToLower(level))];ok {
+ return level, true
+ }
+ return _maxLevel+1, false
+}
+
+func InitLogger(requested LogLevel) (*Logger, error) {
+ var logger *zap.Logger
+ var err error
+ switch requested {
+ case DebugLevel:
+ logger, err = initLoggerByLevel(zapcore.DebugLevel)
+ case InfoLevel:
+ logger, err = initLoggerByLevel(zapcore.InfoLevel)
+ case WarnLevel:
+ logger, err = initLoggerByLevel(zapcore.WarnLevel)
+ case ErrorLevel:
+ logger, err = initLoggerByLevel(zapcore.ErrorLevel)
+ case DPanicLevel:
+ logger, err = initLoggerByLevel(zapcore.DPanicLevel)
+ case PanicLevel:
+ logger, err = initLoggerByLevel(zapcore.PanicLevel)
+ case FatalLevel:
+ logger, err = initLoggerByLevel(zapcore.FatalLevel)
+ default:
+ err = fmt.Errorf("Invalid logging Level :%d",requested)
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &Logger{Logger:logger}, nil
+
+}
+func(l *Logger)Sync() error {
+ l.Debugf("#logger.Sync - Going to flush buffered log")
+ return l.Logger.Sync()
+}
+
+func (l *Logger)Infof(formatMsg string, a ...interface{}) {
+ if l.InfoEnabled() {
+ msg := fmt.Sprintf(formatMsg, a...)
+ l.Logger.Info(msg, zap.Any("mdc", l.getTimeStampMdc()))
+ }
+}
+
+func (l *Logger)Debugf(formatMsg string, a ...interface{}) {
+ if l.DebugEnabled(){
+ msg := fmt.Sprintf(formatMsg, a...)
+ l.Logger.Debug(msg, zap.Any("mdc", l.getTimeStampMdc()))
+ }
+}
+
+func (l *Logger)Errorf(formatMsg string, a ...interface{}) {
+ msg := fmt.Sprintf(formatMsg, a...)
+ l.Logger.Error(msg, zap.Any("mdc", l.getTimeStampMdc()))
+}
+
+func (l *Logger)Warnf(formatMsg string, a ...interface{}) {
+ msg := fmt.Sprintf(formatMsg, a...)
+ l.Logger.Warn(msg, zap.Any("mdc", l.getTimeStampMdc()))
+}
+
+func (l *Logger) getTimeStampMdc() map[string]string {
+ timeStr := time.Now().Format("2006-01-02 15:04:05.000")
+ mdc := map[string]string{"time": timeStr}
+ return mdc
+}
+
+func (l *Logger)InfoEnabled()bool{
+ return l.Logger.Core().Enabled(zap.InfoLevel)
+}
+
+func (l *Logger)DebugEnabled()bool{
+ return l.Logger.Core().Enabled(zap.DebugLevel)
+}
+
+func (l *Logger)DPanicf(formatMsg string, a ...interface{}) {
+ msg := fmt.Sprintf(formatMsg, a...)
+ l.Logger.DPanic(msg, zap.Any("mdc", l.getTimeStampMdc()))
+}
+
+func initLoggerByLevel(l zapcore.Level) (*zap.Logger, error) {
+ cfg := zap.Config{
+ Encoding: "json",
+ Level: zap.NewAtomicLevelAt(l),
+ OutputPaths: []string{"stdout"},
+ ErrorOutputPaths: []string{"stderr"},
+ EncoderConfig: zapcore.EncoderConfig{
+ MessageKey: "msg",
+
+ LevelKey: "crit",
+ EncodeLevel: zapcore.CapitalLevelEncoder,
+
+ TimeKey: "ts",
+ EncodeTime: epochMillisIntegerTimeEncoder,
+
+ CallerKey: "id",
+ EncodeCaller: xAppMockCallerEncoder,
+ },
+ }
+ return cfg.Build()
+}
+
+func xAppMockCallerEncoder(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) {
+ enc.AppendString("xAppMock")
+}
+
+func epochMillisIntegerTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
+ nanos := t.UnixNano()
+ millis := int64(nanos) / int64(time.Millisecond)
+ enc.AppendInt64(millis)
+}
+
import (
"context"
"flag"
- "log"
+ "fmt"
"os"
"os/signal"
"strconv"
"time"
"xappmock/dispatcher"
"xappmock/frontend"
+ "xappmock/logger"
"xappmock/rmr"
+ "xappmock/sender"
)
const (
var rmrService *rmr.Service
func main() {
+
+ logLevel, _ := logger.LogLevelTokenToLevel("info")
+ logger, err := logger.InitLogger(logLevel)
+ if err != nil {
+ fmt.Printf("#app.main - failed to initialize logger, error: %s", err)
+ os.Exit(1)
+ }
+
var rmrContext *rmr.Context
var rmrConfig = rmr.Config{Port: RMR_PORT_DEFAULT, MaxMsgSize: rmr.RMR_MAX_MSG_SIZE, MaxRetries: 10, Flags: 0}
if port, err := strconv.ParseUint(os.Getenv(ENV_RMR_PORT), 10, 16); err == nil {
rmrConfig.Port = int(port)
} else {
- log.Printf("#main - %s: %s, using default (%d).", ENV_RMR_PORT, err, RMR_PORT_DEFAULT)
+ logger.Infof("#main - %s: %s, using default (%d).", ENV_RMR_PORT, err, RMR_PORT_DEFAULT)
}
rmrService = rmr.NewService(rmrConfig, rmrContext)
- dispatcherDesc := dispatcher.New(rmrService)
+ jsonSender := sender.NewJsonSender(logger)
+ dispatcherDesc := dispatcher.New(logger, rmrService, jsonSender)
/* Load configuration file*/
- err := frontend.ProcessConfigurationFile("resources", "conf", ".json",
+ err = frontend.ProcessConfigurationFile("resources", "conf", ".json",
func(data []byte) error {
return frontend.JsonCommandsDecoder(data, dispatcherDesc.JsonCommandsDecoderCB)
})
if err != nil {
- log.Fatalf("#main - processing error: %s", err)
+ logger.Errorf("#main - processing error: %s", err)
+ os.Exit(1)
}
- log.Print("#main - xApp Mock is up and running...")
+ logger.Infof("#main - xApp Mock is up and running...")
flag.Parse()
cmd := flag.Arg(0) /*first remaining argument after flags have been processed*/
command, err := frontend.DecodeJsonCommand([]byte(cmd))
if err != nil {
- log.Printf("#main - command decoding error: %s", err)
+ logger.Errorf("#main - command decoding error: %s", err)
rmrService.CloseContext()
- log.Print("#main - xApp Mock is down")
+ logger.Infof("#main - xApp Mock is down")
return
}
go func() {
oscall := <-c
- log.Printf("system call:%+v", oscall)
+ logger.Infof("system call:%+v", oscall)
cancel()
rmrService.CloseContext()
}()
- processStartTime := time.Now()
dispatcherDesc.ProcessJsonCommand(ctx, command)
pr := dispatcherDesc.GetProcessResult()
if pr.Err != nil {
- log.Printf("#main - command processing Error: %s", err)
+ logger.Errorf("#main - command processing Error: %s", err)
}
- processElapsedTimeInMs := float64(time.Since(processStartTime)) / float64(time.Millisecond)
+ if pr.StartTime != nil {
+ processElapsedTimeInMs := float64(time.Since(*pr.StartTime)) / float64(time.Millisecond)
+ logger.Infof("#main - processing (sending/receiving) messages took %.2f ms", processElapsedTimeInMs)
- log.Printf("#main - processing (sending/receiving) messages took %.2f ms", processElapsedTimeInMs)
- log.Printf("#main - process result: %s", pr)
+ }
+ logger.Infof("#main - process stats: %s", pr.Stats)
rmrService.CloseContext() // TODO: called twice
- log.Print("#main - xApp Mock is down")
+ logger.Infof("#main - xApp Mock is down")
}
package models
import (
- "encoding/json"
"fmt"
"time"
)
MessageTimestamp int64 `json:"messageTimestamp"`
MessageType int `json:"messageType"`
Meid string `json:"meid"`
- Payload string `json:"payload"`
+ Payload []byte `json:"payload"`
TransactionId string `json:"transactionId"`
}
-func GetMessageInfoAsJson(messageType int, meid string, payload []byte, transactionId []byte) string {
- messageInfo := MessageInfo{
+func NewMessageInfo(messageType int, meid string, payload []byte, transactionId []byte) MessageInfo {
+ return MessageInfo{
MessageTimestamp: time.Now().Unix(),
MessageType: messageType,
Meid: meid,
- Payload: fmt.Sprintf("%x", payload),
+ Payload: payload,
TransactionId: string(transactionId),
}
+}
- jsonData, _ := json.Marshal(messageInfo)
-
- return string(jsonData)
+func (mi MessageInfo) String() string {
+ return fmt.Sprintf("message timestamp: %d | message type: %d | meid: %s | payload: %x | transaction id: %s",
+ mi.MessageTimestamp, mi.MessageType, mi.Meid, mi.Payload, mi.TransactionId)
}
import (
"fmt"
"go.uber.org/atomic"
+ "time"
)
type ProcessStats struct {
}
type ProcessResult struct {
- Stats ProcessStats
- Err error
+ StartTime *time.Time
+ Stats ProcessStats
+ Err error
}
-func (pr ProcessResult) String() string {
- return fmt.Sprintf("\nNumber of sent messages: %d\nNumber of send errors: %d\n"+
- "Number of expected received messages: %d\nNumber of unexpected received messages: %d\n"+
- "Number of receive errors: %d\n", pr.Stats.SentCount, pr.Stats.SentErrorCount, pr.Stats.ReceivedExpectedCount, pr.Stats.ReceivedUnexpectedCount, pr.Stats.ReceivedErrorCount)
+func (ps ProcessStats) String() string {
+ return fmt.Sprintf("sent messages: %d | send errors: %d | expected received messages: %d | unexpected received messages: %d | receive errors: %d",
+ ps.SentCount, ps.SentErrorCount, ps.ReceivedExpectedCount, ps.ReceivedUnexpectedCount, ps.ReceivedErrorCount)
}
import (
"fmt"
"github.com/pkg/errors"
- "log"
+ "os"
"reflect"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"
+ "xappmock/logger"
"xappmock/models"
"xappmock/rmr"
)
var counter uint64
-func SendJsonRmrMessage(command models.JsonCommand /*the copy is modified locally*/, xAction *[]byte, r *rmr.Service) error {
+type JsonSender struct {
+ logger *logger.Logger
+}
+
+func NewJsonSender(logger *logger.Logger) *JsonSender {
+ return &JsonSender{
+ logger: logger,
+ }
+}
+
+func (s *JsonSender) SendJsonRmrMessage(command models.JsonCommand /*the copy is modified locally*/, xAction *[]byte, r *rmr.Service) error {
var payload []byte
_, err := fmt.Sscanf(command.PackedPayload, "%x", &payload)
if err != nil {
if len(command.TransactionId) == 0 {
command.TransactionId = string(*xAction)
}
- command.PayloadHeader = expandPayloadHeader(command.PayloadHeader, &command)
- log.Printf("#jsonSender.SendJsonRmrMessage - command payload header: %s", command.PayloadHeader)
+ command.PayloadHeader = s.expandPayloadHeader(command.PayloadHeader, &command)
+ s.logger.Infof("#JsonSender.SendJsonRmrMessage - command payload header: %s", command.PayloadHeader)
rmrMsgId, err := rmr.MessageIdToUint(command.RmrMessageType)
if err != nil {
return errors.New(fmt.Sprintf("invalid rmr message id: %s", command.RmrMessageType))
}
msg := append([]byte(command.PayloadHeader), payload...)
- messageInfo := models.GetMessageInfoAsJson(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
- log.Printf("#rmr.Service.SendMessage - %s", messageInfo)
+ messageInfo := models.NewMessageInfo(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
+ s.logger.Infof("#JsonSender.SendJsonRmrMessage - going to send message: %s", messageInfo)
_, err = r.SendMessage(int(rmrMsgId), command.Meid, msg, []byte(command.TransactionId))
return err
* Example: “payloadHeader”: “$ranIp|$ranPort|$ranName|#packedPayload|”
*/
-func expandPayloadHeader(header string, command *models.JsonCommand) string {
+func (s *JsonSender) expandPayloadHeader(header string, command *models.JsonCommand) string {
var name strings.Builder
var expandedHeader strings.Builder
case reflect.Float64, reflect.Float32:
expandedHeader.WriteString(fmt.Sprintf("%g", fieldValue.Float()))
default:
- log.Fatalf("#jsonSender.expandPayloadHeader - invalid type for $%s, value must be a string, an int, a bool or a float", name.String())
+ s.logger.Errorf("#JsonSender.expandPayloadHeader - invalid type for $%s, value must be a string, an int, a bool or a float", name.String())
+ os.Exit(1)
}
}
name.Reset()
if fieldValue.Kind() == reflect.String {
expandedHeader.WriteString(strconv.FormatInt(int64(len(fieldValue.String())), 10))
} else {
- log.Fatalf("#jsonSender.expandPayloadHeader - invalid type for #%s, value must be a string", name.String())
+ s.logger.Errorf("#JsonSender.expandPayloadHeader - invalid type for #%s, value must be a string", name.String())
+ os.Exit(1)
}
}
name.Reset()