c2b988d16171e1c25bc04e970ea6809646d0b521
[nonrtric/rapp/ransliceassurance.git] / messageHandler.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package sliceassurance
22
23 import (
24         "bytes"
25         "encoding/json"
26         "io"
27         "io/ioutil"
28
29         log "github.com/sirupsen/logrus"
30         "oransc.org/usecase/oduclosedloop/icsversion/internal/structures"
31         "oransc.org/usecase/oduclosedloop/icsversion/messages"
32 )
33
34 type MessageHandler struct {
35         data *structures.SliceAssuranceMeas
36 }
37
38 func NewMessageHandler(data *structures.SliceAssuranceMeas) *MessageHandler {
39         return &MessageHandler{
40                 data: data,
41         }
42 }
43
44 func (handler MessageHandler) ProcessMessage(body io.ReadCloser) {
45         log.Debug("Process messages from DMaaP mediator")
46
47         if messages := getVesMessages(body); messages != nil {
48                 stdMessages := getStdMessages(messages)
49
50                 for _, message := range stdMessages {
51                         for _, meas := range message.GetMeasurements() {
52                                 log.Infof("Create sliceMetric and check if metric exist and update existing one or create new one measurement:  %+v\n", meas)
53                                 //Create sliceMetric and check if metric exist and update existing one or create new one
54                                 if _, err := handler.data.AddOrUpdateMetric(meas); err != nil {
55                                         log.Error("Metric could not be added ", err)
56                                 }
57                         }
58                 }
59         }
60
61 }
62
63 func getVesMessages(r io.ReadCloser) *[]string {
64         var messages []string
65         body, err := ioutil.ReadAll(r)
66         if err != nil {
67                 log.Warn(err)
68                 return nil
69         }
70         if bytes.HasPrefix(body, []byte("{")) {
71                 messages = append(messages, string(body))
72         } else {
73                 err = json.Unmarshal(body, &messages)
74                 if err != nil {
75                         log.Warn(err)
76                         return nil
77                 }
78         }
79         return &messages
80 }
81
82 func getStdMessages(messageStrings *[]string) []messages.StdDefinedMessage {
83         stdMessages := make([]messages.StdDefinedMessage, 0, len(*messageStrings))
84         for _, msgString := range *messageStrings {
85                 var message messages.StdDefinedMessage
86                 if err := json.Unmarshal([]byte(msgString), &message); err == nil {
87                         stdMessages = append(stdMessages, message)
88                 } else {
89                         log.Warn(err)
90                 }
91         }
92         return stdMessages
93 }