Merge "RANPM Data File Collector - i-release - o-ran-sc/nonrtric-plt-ranpm-datafileco...
[nonrtric/plt/ranpm.git] / pm-file-converter / main.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation. All rights reserved.
7 //      Copyright (C) 2023 OpenInfra Foundation Europe. All rights reserved.
8 //      %%
9 //      Licensed under the Apache License, Version 2.0 (the "License");
10 //      you may not use this file except in compliance with the License.
11 //      You may obtain a copy of the License at
12 //
13 //           http://www.apache.org/licenses/LICENSE-2.0
14 //
15 //      Unless required by applicable law or agreed to in writing, software
16 //      distributed under the License is distributed on an "AS IS" BASIS,
17 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 //      See the License for the specific language governing permissions and
19 //      limitations under the License.
20 //      ========================LICENSE_END===================================
21 package main
22
23 import (
24         "fmt"
25         "main/common/dataTypes"
26         "main/common/utils"
27         "main/components/kafkacollector"
28         "net/http"
29         "os"
30         "os/signal"
31         "runtime"
32         "sync"
33         "syscall"
34         "time"
35
36         jsoniter "github.com/json-iterator/go"
37         log "github.com/sirupsen/logrus"
38 )
39
40 var ics_server = os.Getenv("ICS")
41 var self = os.Getenv("SELF")
42
43 // This are optional - set if using SASL protocol is used towards kafka
44 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
45
46 var bootstrapserver = os.Getenv("KAFKA_SERVER")
47
48 const config_file = "application_configuration.json"
49 const producer_name = "kafka-producer"
50
51 var producer_instance_name string = producer_name
52
53 const reader_queue_length = 100 //Per type job
54 const writer_queue_length = 100 //Per info job
55
56 var files_volume = os.Getenv("FILES_VOLUME")
57
58 var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
59 var writer_control = make(chan dataTypes.WriterControl, 1)
60
61 const registration_delay_short = 2
62 const registration_delay_long = 120
63
64 const failedMessageLabel = " - failed"
65
66 //== Variables ==//
67
68 var AppState = Init
69
70 // Lock for all internal data
71 var datalock sync.Mutex
72
73 const (
74         Init dataTypes.AppStates = iota
75         Running
76         Terminating
77 )
78
79 const registeringProducer = "Registering producer: "
80
81 // == Main ==//
82 func main() {
83
84         //log.SetLevel(log.InfoLevel)
85         log.SetLevel(log.TraceLevel)
86
87         log.Info("Server starting...")
88
89         if self == "" {
90                 log.Panic("Env SELF not configured")
91         }
92         if bootstrapserver == "" {
93                 log.Panic("Env KAFKA_SERVER not set")
94         }
95         if ics_server == "" {
96                 log.Panic("Env ICS not set")
97         }
98         if os.Getenv("KP") != "" {
99                 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
100         }
101
102         go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
103
104         //Setup proc for periodic type registration
105         var eventChan = make(chan int) //Channel for stopping the proc
106         go periodicRegistration(eventChan)
107
108         //Wait for term/int signal do try to shut down gracefully
109         sigs := make(chan os.Signal, 1)
110         signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
111         go func() {
112                 sig := <-sigs
113                 fmt.Printf("Received signal %s - application will terminate\n", sig)
114                 eventChan <- 0 // Stop periodic registration
115                 datalock.Lock()
116                 defer datalock.Unlock()
117                 AppState = Terminating
118         }()
119
120         AppState = Running
121
122         //Wait until all go routines has exited
123         runtime.Goexit()
124
125         fmt.Println("main routine exit")
126         fmt.Println("server stopped")
127 }
128
129 // == Core functions ==//
130 // Run periodic registration of producers
131 func periodicRegistration(evtch chan int) {
132         var delay int = 1
133         for {
134                 select {
135                 case msg := <-evtch:
136                         if msg == 0 { // Stop thread
137                                 return
138                         }
139                 case <-time.After(time.Duration(delay) * time.Second):
140                         ok := registerProducer()
141                         if ok {
142                                 delay = registration_delay_long
143                         } else {
144                                 if delay < registration_delay_long {
145                                         delay += registration_delay_short
146                                 } else {
147                                         delay = registration_delay_short
148                                 }
149                         }
150                 }
151         }
152 }
153
154 func registerProducer() bool {
155
156         log.Info(registeringProducer, producer_instance_name)
157
158         file, err := os.ReadFile(config_file)
159         if err != nil {
160                 log.Error("Cannot read config file: ", config_file)
161                 // NOSONAR
162                 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
163                 return false
164         }
165         data := dataTypes.DataTypes{}
166         err = jsoniter.Unmarshal([]byte(file), &data)
167         if err != nil {
168                 log.Error("Cannot parse config file: ", config_file)
169                 // NOSONAR
170                 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
171                 return false
172         }
173         var newTypeNames []string
174
175         for i := 0; i < len(data.ProdDataTypes); i++ {
176                 t1 := make(map[string]interface{})
177                 t2 := make(map[string]interface{})
178
179                 t2["schema"] = "http://json-schema.org/draft-07/schema#"
180                 t2["title"] = data.ProdDataTypes[i].ID
181                 t2["description"] = data.ProdDataTypes[i].ID
182                 t2["type"] = "object"
183
184                 t1["info_job_data_schema"] = t2
185
186                 json, err := jsoniter.Marshal(t1)
187                 if err != nil {
188                         log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
189                         // NOSONAR
190                         log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
191                         return false
192                 } else {
193                         ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
194                         if !ok {
195                                 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
196                                 // NOSONAR
197                                 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
198                                 return false
199                         }
200                         newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
201                 }
202
203         }
204
205         log.Debug("Registering types: ", newTypeNames)
206         datalock.Lock()
207         defer datalock.Unlock()
208
209         for _, v := range data.ProdDataTypes {
210                 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
211                 startTypeJob(v)
212         }
213
214         dataTypes.InfoTypes = data
215         log.Debug("Datatypes: ", dataTypes.InfoTypes)
216         log.Info(registeringProducer, producer_instance_name, " - OK")
217         return true
218 }
219
220 func startTypeJob(dp dataTypes.DataType) {
221         log.Info("Starting type job: ", dp.ID)
222         jobRecord := dataTypes.TypeJobRecord{}
223
224         jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
225         jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
226         jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
227         jobRecord.InfoType = dp.ID
228         jobRecord.InputTopic = dp.KafkaInputTopic
229         jobRecord.GroupId = "kafka-procon-" + dp.ID
230         jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
231
232         switch dp.ID {
233         case "xml-file-data-to-filestore":
234                 go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
235         case "xml-file-data":
236                 go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
237         default:
238         }
239
240         go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
241
242         dataTypes.TypeJobs[dp.ID] = jobRecord
243         log.Debug("Type job input type: ", dp.InputJobType)
244 }