38de97d8fd71e9c37c7d8439970d7bdc9c9357ce
[nonrtric/plt/ranpm.git] / pm-file-converter / main.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20 package main
21
22 import (
23         "fmt"
24         jsoniter "github.com/json-iterator/go"
25         log "github.com/sirupsen/logrus"
26         "main/common/dataTypes"
27         "main/common/utils"
28         "main/components/kafkacollector"
29         "net/http"
30         "os"
31         "os/signal"
32         "runtime"
33         "sync"
34         "syscall"
35         "time"
36 )
37
38 var ics_server = os.Getenv("ICS")
39 var self = os.Getenv("SELF")
40
41 // This are optional - set if using SASL protocol is used towards kafka
42 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
43
44 var bootstrapserver = os.Getenv("KAFKA_SERVER")
45
46 const config_file = "application_configuration.json"
47 const producer_name = "kafka-producer"
48
49 var producer_instance_name string = producer_name
50
51 const reader_queue_length = 100 //Per type job
52 const writer_queue_length = 100 //Per info job
53
54 var files_volume = os.Getenv("FILES_VOLUME")
55
56 var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
57 var writer_control = make(chan dataTypes.WriterControl, 1)
58
59 const registration_delay_short = 2
60 const registration_delay_long = 120
61
62 const failedMessageLabel = " - failed"
63
64 //== Variables ==//
65
66 var AppState = Init
67
68 // Lock for all internal data
69 var datalock sync.Mutex
70
71 const (
72         Init dataTypes.AppStates = iota
73         Running
74         Terminating
75 )
76
77 const registeringProducer = "Registering producer: "
78
79 // == Main ==//
80 func main() {
81
82         //log.SetLevel(log.InfoLevel)
83         log.SetLevel(log.TraceLevel)
84
85         log.Info("Server starting...")
86
87         if self == "" {
88                 log.Panic("Env SELF not configured")
89         }
90         if bootstrapserver == "" {
91                 log.Panic("Env KAFKA_SERVER not set")
92         }
93         if ics_server == "" {
94                 log.Panic("Env ICS not set")
95         }
96         if os.Getenv("KP") != "" {
97                 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
98         }
99
100         go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
101
102         //Setup proc for periodic type registration
103         var eventChan = make(chan int) //Channel for stopping the proc
104         go periodicRegistration(eventChan)
105
106         //Wait for term/int signal do try to shut down gracefully
107         sigs := make(chan os.Signal, 1)
108         signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
109         go func() {
110                 sig := <-sigs
111                 fmt.Printf("Received signal %s - application will terminate\n", sig)
112                 eventChan <- 0 // Stop periodic registration
113                 datalock.Lock()
114                 defer datalock.Unlock()
115                 AppState = Terminating
116         }()
117
118         AppState = Running
119
120         //Wait until all go routines has exited
121         runtime.Goexit()
122
123         fmt.Println("main routine exit")
124         fmt.Println("server stopped")
125 }
126
127 // == Core functions ==//
128 // Run periodic registration of producers
129 func periodicRegistration(evtch chan int) {
130         var delay int = 1
131         for {
132                 select {
133                 case msg := <-evtch:
134                         if msg == 0 { // Stop thread
135                                 return
136                         }
137                 case <-time.After(time.Duration(delay) * time.Second):
138                         ok := registerProducer()
139                         if ok {
140                                 delay = registration_delay_long
141                         } else {
142                                 if delay < registration_delay_long {
143                                         delay += registration_delay_short
144                                 } else {
145                                         delay = registration_delay_short
146                                 }
147                         }
148                 }
149         }
150 }
151
152 func registerProducer() bool {
153
154         log.Info(registeringProducer, producer_instance_name)
155
156         file, err := os.ReadFile(config_file)
157         if err != nil {
158                 log.Error("Cannot read config file: ", config_file)
159                 // NOSONAR
160                 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
161                 return false
162         }
163         data := dataTypes.DataTypes{}
164         err = jsoniter.Unmarshal([]byte(file), &data)
165         if err != nil {
166                 log.Error("Cannot parse config file: ", config_file)
167                 // NOSONAR
168                 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
169                 return false
170         }
171         var newTypeNames []string
172
173         for i := 0; i < len(data.ProdDataTypes); i++ {
174                 t1 := make(map[string]interface{})
175                 t2 := make(map[string]interface{})
176
177                 t2["schema"] = "http://json-schema.org/draft-07/schema#"
178                 t2["title"] = data.ProdDataTypes[i].ID
179                 t2["description"] = data.ProdDataTypes[i].ID
180                 t2["type"] = "object"
181
182                 t1["info_job_data_schema"] = t2
183
184                 json, err := jsoniter.Marshal(t1)
185                 if err != nil {
186                         log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
187                         // NOSONAR
188                         log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
189                         return false
190                 } else {
191                         ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
192                         if !ok {
193                                 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
194                                 // NOSONAR
195                                 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
196                                 return false
197                         }
198                         newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
199                 }
200
201         }
202
203         log.Debug("Registering types: ", newTypeNames)
204         datalock.Lock()
205         defer datalock.Unlock()
206
207         for _, v := range data.ProdDataTypes {
208                 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
209                 startTypeJob(v)
210         }
211
212         dataTypes.InfoTypes = data
213         log.Debug("Datatypes: ", dataTypes.InfoTypes)
214         log.Info(registeringProducer, producer_instance_name, " - OK")
215         return true
216 }
217
218 func startTypeJob(dp dataTypes.DataType) {
219         log.Info("Starting type job: ", dp.ID)
220         jobRecord := dataTypes.TypeJobRecord{}
221
222         jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
223         jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
224         jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
225         jobRecord.InfoType = dp.ID
226         jobRecord.InputTopic = dp.KafkaInputTopic
227         jobRecord.GroupId = "kafka-procon-" + dp.ID
228         jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
229
230         switch dp.ID {
231         case "xml-file-data-to-filestore":
232                 go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
233         case "xml-file-data":
234                 go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
235         default:
236         }
237
238         go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
239
240         dataTypes.TypeJobs[dp.ID] = jobRecord
241         log.Debug("Type job input type: ", dp.InputJobType)
242 }