1 // This source code is part of the near-RT RIC (RAN Intelligent Controller)
2 // platform project (RICP).
4 // Copyright 2019 Nokia
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
10 // http://www.apache.org/licenses/LICENSE-2.0
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
19 // This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 // platform project (RICP).
27 "github.com/pkg/errors"
39 var configuration = make(map[string]*models.JsonCommand)
41 // Rmr Message Id -> Command
42 var waitForRmrMessageType = make(map[int]*models.JsonCommand)
44 func addRmrMessageToWaitFor(rmrMessageToWaitFor string, command models.JsonCommand) error {
45 rmrMsgId, err := rmr.MessageIdToUint(rmrMessageToWaitFor)
48 return errors.New(fmt.Sprintf("invalid rmr message id: %s", rmrMessageToWaitFor))
51 waitForRmrMessageType[int(rmrMsgId)] = &command
55 type Dispatcher struct {
56 rmrService *rmr.Service
57 processResult models.ProcessResult
59 jsonSender *sender.JsonSender
62 func (d *Dispatcher) GetProcessResult() models.ProcessResult {
63 return d.processResult
66 func New(logger *logger.Logger, rmrService *rmr.Service, jsonSender *sender.JsonSender) *Dispatcher {
68 rmrService: rmrService,
70 jsonSender: jsonSender,
74 func (d *Dispatcher) JsonCommandsDecoderCB(cmd models.JsonCommand) error {
76 return errors.New(fmt.Sprintf("invalid cmd, no id"))
78 configuration[cmd.Id] = &cmd
81 // if len(cmd.ReceiveCommandId) == 0 {
85 // return addRmrMessageToWaitFor(cmd.ReceiveCommandId, cmd)
88 func (d *Dispatcher) sendNoRepeat(command models.JsonCommand) error {
90 if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
92 d.processResult.StartTime = &now
95 err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
98 d.logger.Errorf("#Dispatcher.sendNoRepeat - error sending rmr message: %s", err)
99 d.processResult.Err = err
100 d.processResult.Stats.SentErrorCount.Inc()
104 d.processResult.Stats.SentCount.Inc()
108 func (d *Dispatcher) sendWithRepeat(ctx context.Context, command models.JsonCommand) {
110 if enums.CommandAction(command.Action) == enums.SendRmrMessage && d.processResult.StartTime == nil {
112 d.processResult.StartTime = &now
115 for repeatCount := command.RepeatCount; repeatCount > 0; repeatCount-- {
123 err := d.jsonSender.SendJsonRmrMessage(command, nil, d.rmrService)
126 d.logger.Errorf("#Dispatcher.sendWithRepeat - error sending rmr message: %s", err)
127 d.processResult.Stats.SentErrorCount.Inc()
131 d.processResult.Stats.SentCount.Inc()
132 time.Sleep(time.Duration(command.RepeatDelayInMs) * time.Millisecond)
136 func getReceiveRmrMessageType(receiveCommandId string) (string, error) {
137 command, ok := configuration[receiveCommandId]
140 return "", errors.New(fmt.Sprintf("invalid receive command id: %s", receiveCommandId))
143 if len(command.RmrMessageType) == 0 {
144 return "", errors.New(fmt.Sprintf("missing RmrMessageType for command id: %s", receiveCommandId))
147 return command.RmrMessageType, nil
150 func (d *Dispatcher) sendHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
152 defer sendAndReceiveWg.Done()
153 var listenAndHandleWg sync.WaitGroup
155 if len(command.ReceiveCommandId) > 0 {
156 rmrMessageToWaitFor, err := getReceiveRmrMessageType(command.ReceiveCommandId)
159 d.processResult.Err = err
163 err = addRmrMessageToWaitFor(rmrMessageToWaitFor, command)
166 d.processResult.Err = err
170 listenAndHandleWg.Add(1)
171 go d.listenAndHandle(ctx, &listenAndHandleWg, command)
174 if command.RepeatCount == 0 {
175 err := d.sendNoRepeat(command)
182 d.sendWithRepeat(ctx, command)
185 if len(command.ReceiveCommandId) > 0 {
186 listenAndHandleWg.Wait()
190 func (d *Dispatcher) receiveHandler(ctx context.Context, sendAndReceiveWg *sync.WaitGroup, command models.JsonCommand) {
192 defer sendAndReceiveWg.Done()
194 err := addRmrMessageToWaitFor(command.RmrMessageType, command)
197 d.processResult.Err = err
201 var listenAndHandleWg sync.WaitGroup
202 listenAndHandleWg.Add(1) // this is due to the usage of listenAndHandle as a goroutine in the sender case
203 d.listenAndHandle(ctx, &listenAndHandleWg, command)
206 func getMergedCommand(cmd *models.JsonCommand) (models.JsonCommand, error) {
207 var command models.JsonCommand
208 if len(cmd.Id) == 0 {
209 return command, errors.New(fmt.Sprintf("invalid command, no id"))
214 conf, ok := configuration[cmd.Id]
218 mergeConfigurationAndCommand(&command, cmd)
224 func (d *Dispatcher) ProcessJsonCommand(ctx context.Context, cmd *models.JsonCommand) {
226 command, err := getMergedCommand(cmd)
229 d.processResult.Err = err
233 var sendAndReceiveWg sync.WaitGroup
235 commandAction := enums.CommandAction(command.Action)
237 switch commandAction {
239 case enums.SendRmrMessage:
240 sendAndReceiveWg.Add(1)
241 go d.sendHandler(ctx, &sendAndReceiveWg, command)
242 case enums.ReceiveRmrMessage:
243 sendAndReceiveWg.Add(1)
244 go d.receiveHandler(ctx, &sendAndReceiveWg, command)
246 d.processResult = models.ProcessResult{Err: errors.New(fmt.Sprintf("invalid command action %s", command.Action))}
250 sendAndReceiveWg.Wait()
253 func getResponseCommand(command models.JsonCommand) (*models.JsonCommand, error) {
254 responseCommand, ok := configuration[command.SendCommandId]
257 return nil, errors.New(fmt.Sprintf("invalid SendCommandId %s", command.SendCommandId))
260 return responseCommand, nil
263 func (d *Dispatcher) listenAndHandleNoRepeat(ctx context.Context, command models.JsonCommand) {
271 mbuf, err := d.rmrService.RecvMessage()
274 d.logger.Errorf("#Dispatcher.listenAndHandleNoRepeat - error receiving message: %s", err)
275 d.processResult.Err = err
276 d.processResult.Stats.ReceivedErrorCount.Inc()
280 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
282 d.processResult.StartTime = &now
285 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
287 _, ok := waitForRmrMessageType[mbuf.MType]
290 d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received unexpected msg: %s", messageInfo)
291 d.processResult.Stats.ReceivedUnexpectedCount.Inc()
295 d.logger.Infof("#Dispatcher.listenAndHandleNoRepeat - received expected msg: %s", messageInfo)
296 d.processResult.Stats.ReceivedExpectedCount.Inc()
298 if len(command.SendCommandId) > 0 {
299 responseCommand, err := getResponseCommand(command)
302 d.processResult.Err = err
306 _ = d.sendNoRepeat(*responseCommand)
313 func (d *Dispatcher) listenAndHandleWithRepeat(ctx context.Context, command models.JsonCommand) {
315 var responseCommand *models.JsonCommand
317 if len(command.SendCommandId) > 0 {
319 responseCommand, err = getResponseCommand(command)
322 d.processResult.Err = err
327 for d.processResult.Stats.ReceivedExpectedCount.Load() < int32(command.RepeatCount) {
334 mbuf, err := d.rmrService.RecvMessage()
337 d.logger.Errorf("#Dispatcher.listenAndHandleWithRepeat - error receiving message: %s", err)
338 d.processResult.Stats.ReceivedErrorCount.Inc()
342 if enums.CommandAction(command.Action) == enums.ReceiveRmrMessage && d.processResult.StartTime == nil {
344 d.processResult.StartTime = &now
347 messageInfo := models.NewMessageInfo(mbuf.MType, mbuf.Meid, mbuf.Payload, mbuf.XAction)
349 _, ok := waitForRmrMessageType[mbuf.MType]
352 d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received unexpected msg: %s", messageInfo)
353 d.processResult.Stats.ReceivedUnexpectedCount.Inc()
357 d.logger.Infof("#Dispatcher.listenAndHandleWithRepeat - received expected msg: %s", messageInfo)
358 d.processResult.Stats.ReceivedExpectedCount.Inc()
360 if responseCommand != nil {
361 _ = d.sendNoRepeat(*responseCommand) // TODO: goroutine? + error handling
366 func (d *Dispatcher) listenAndHandle(ctx context.Context, listenAndHandleWg *sync.WaitGroup, command models.JsonCommand) {
368 defer listenAndHandleWg.Done()
370 if command.RepeatCount == 0 {
371 d.listenAndHandleNoRepeat(ctx, command)
375 d.listenAndHandleWithRepeat(ctx, command)
378 func mergeConfigurationAndCommand(conf *models.JsonCommand, cmd *models.JsonCommand) {
379 nFields := reflect.Indirect(reflect.ValueOf(cmd)).NumField()
381 for i := 0; i < nFields; i++ {
382 if fieldValue := reflect.Indirect(reflect.ValueOf(cmd)).Field(i); fieldValue.IsValid() {
383 switch fieldValue.Kind() {
385 if fieldValue.Len() > 0 {
386 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
388 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
389 if fieldValue.Int() != 0 {
390 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
393 if fieldValue.Bool() {
394 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
396 case reflect.Float64, reflect.Float32:
397 if fieldValue.Float() != 0 {
398 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)
401 reflect.Indirect(reflect.ValueOf(conf)).Field(i).Set(fieldValue)