CI: Add silent prescan SonarCloud job
[nonrtric/plt/ranpm.git] / pm-rapp / main.go
1 //  ============LICENSE_START===============================================
2 //  Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 //  ========================================================================
4 //  Licensed under the Apache License, Version 2.0 (the "License");
5 //  you may not use this file except in compliance with the License.
6 //  You may obtain a copy of the License at
7 //
8 //       http://www.apache.org/licenses/LICENSE-2.0
9 //
10 //  Unless required by applicable law or agreed to in writing, software
11 //  distributed under the License is distributed on an "AS IS" BASIS,
12 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 //  See the License for the specific language governing permissions and
14 //  limitations under the License.
15 //  ============LICENSE_END=================================================
16 //
17
18 package main
19
20 import (
21         "bytes"
22         "compress/gzip"
23         "context"
24         "encoding/json"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "net/http"
29         "net/http/pprof"
30         "os"
31         "os/signal"
32         "runtime"
33         "strconv"
34         "strings"
35         "syscall"
36         "time"
37
38         "github.com/confluentinc/confluent-kafka-go/kafka"
39         "github.com/gorilla/mux"
40         jsoniter "github.com/json-iterator/go"
41         log "github.com/sirupsen/logrus"
42         "golang.org/x/oauth2/clientcredentials"
43 )
44
45 type JobDefinition struct {
46         InfoTypeID    string `json:"info_type_id"`
47         JobOwner      string `json:"job_owner"`
48         StatusNotificationURI  string `json:"status_notification_uri"`
49         JobDefinition struct {
50                 Filter           json.RawMessage `json:"filter"`
51                 DeliveryInfo     struct {
52                         Topic            string `json:"topic"`
53                         BootStrapServers string `json:"bootStrapServers"`
54                 } `json:"deliveryInfo"`
55         } `json:"job_definition"`
56 }
57
58 const jobdef = "/config/jobDefinition.json"
59
60 var rapp_id = os.Getenv("APPID")
61
62 var rapp_ns = os.Getenv("APPNS")
63
64 var bootstrapserver = os.Getenv("KAFKA_SERVER")
65
66 var topic = os.Getenv("TOPIC")
67
68 var ics_server = os.Getenv("ICS")
69
70 var jwt_file = os.Getenv("JWT_FILE")
71
72 var ssl_path = os.Getenv("SSLPATH")
73
74 var gzipped_data = os.Getenv("GZIP")
75
76 var log_payload = os.Getenv("LOG_PAYLOAD")
77
78 // These are optional - if rapp is fethcing the token instead of the side car
79 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
80 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
81 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
82 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
83
84 var gid = ""
85 var cid = "cid-0"
86
87 var msg_count int = 0
88 var msg_corrupted_count int = 0
89
90 var jobid = "<not-set>"
91 var consumer_type = "<not-set>"
92
93 var currentToken = ""
94
95 var appStatus = "INIT"
96
97 var msg_per_sec int = 0
98
99 var httpclient = &http.Client{}
100
101 // == Main ==//
102 func main() {
103
104         log.SetLevel(log.InfoLevel)
105         log.SetLevel(log.DebugLevel)
106
107         log.Info("Server starting...")
108
109         if creds_service_url != "" {
110                 log.Warn("Disabling jwt retrieval from side car")
111                 jwt_file = ""
112         }
113
114         if rapp_id == "" {
115                 log.Error("Env APPID not set")
116                 os.Exit(1)
117         }
118
119         if rapp_ns == "" {
120                 log.Error("Env APPNS not set")
121                 os.Exit(1)
122         }
123
124         if bootstrapserver == "" {
125                 log.Error("Env KAFKA_SERVER not set")
126                 os.Exit(1)
127         }
128
129         if topic == "" {
130                 log.Error("Env TOPIC not set")
131                 os.Exit(1)
132         }
133
134         if ics_server == "" {
135                 log.Error("Env ICS not set")
136                 os.Exit(1)
137         }
138
139         rtr := mux.NewRouter()
140         rtr.HandleFunc("/statistics", statistics)
141         rtr.HandleFunc("/status", status)
142         rtr.HandleFunc("/logging/{level}", logging_level)
143         rtr.HandleFunc("/logging", logging_level)
144         rtr.HandleFunc("/", alive)
145
146         //For perf/mem profiling
147         rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
148
149         http.Handle("/", rtr)
150
151         fileBytes, err := os.ReadFile(jobdef)
152         if err != nil {
153                 log.Error("Cannot read job defintion file: ", jobdef, err)
154                 os.Exit(1)
155         }
156         fmt.Println("FROM FILE")
157         fmt.Println(string(fileBytes))
158
159         job_json := JobDefinition{}
160         err = jsoniter.Unmarshal([]byte(fileBytes), &job_json)
161         if err != nil {
162                 log.Error("Cannot parse job defintion file: ", jobdef, err)
163                 os.Exit(1)
164         }
165         job_type := job_json.InfoTypeID
166         job_json.JobDefinition.DeliveryInfo.Topic = topic
167         job_json.JobDefinition.DeliveryInfo.BootStrapServers = bootstrapserver
168
169         gid = "pm-rapp-" + job_type + "-" + rapp_id
170
171         jobid = "rapp-job-" + job_type + "-" + rapp_id
172
173         json_bytes, err := json.Marshal(job_json)
174         if err != nil {
175                 log.Error("Cannot marshal job json", err)
176                 os.Exit(1)
177         }
178
179         json_str := string(json_bytes)
180
181         if strings.HasPrefix(bootstrapserver, "http://") {
182                 if creds_service_url != "" {
183                         consumer_type = "accesstoken strimzi bridge consumer"
184                         retrive_token_strimzi()
185                 }
186         } else {
187                 go read_kafka_messages()
188         }
189
190         ok := false
191         if ics_server != "" {
192                 for !ok {
193                         log.Debug("Registring job: ", jobid, " json: ", json_str)
194                         ok, _ = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
195                         if !ok {
196                                 log.Info("Failed to register job: ", jobid, " - retrying")
197                                 time.Sleep(time.Second)
198                         }
199                 }
200         } else {
201                 log.Info("No job registered - read from topic only")
202         }
203         if strings.HasPrefix(bootstrapserver, "http://") {
204                 go read_bridge_messages()
205         }
206
207         go calc_average()
208
209         http_port := "80"
210         http_server := &http.Server{Addr: ":" + http_port, Handler: nil}
211
212         sigs := make(chan os.Signal, 1)
213         signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
214         go func() {
215                 fmt.Println("Setting handler for signal sigint and sigterm")
216                 sig := <-sigs
217                 appStatus = "TERMINATING"
218                 fmt.Printf("Received signal %s - application will terminate\n", sig)
219
220                 if strings.HasPrefix(bootstrapserver, "http://") {
221                         log.Debug("stopping strimzi consumer for job: ", jobid)
222                         ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
223                         if !ok {
224                                 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - retrying")
225                         }
226                 }
227
228                 ok := false
229                 if ics_server != "" {
230                         for !ok {
231                                 log.Debug("stopping job: ", jobid, " json: ", json_str)
232                                 ok, _ = send_http_request(nil, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
233                                 if !ok {
234                                         log.Info("Failed to stop job: ", jobid, " - retrying")
235                                         time.Sleep(time.Second)
236                                 }
237                         }
238                 }
239                 http_server.Shutdown(context.Background())
240         }()
241         appStatus = "RUNNING"
242         log.Info("Starting http service...")
243         err = http_server.ListenAndServe()
244         if err == http.ErrServerClosed { // graceful shutdown
245                 log.Info("http server shutdown...")
246                 os.Exit(1)
247         } else if err != nil {
248                 log.Error("http server error: ", err)
249                 log.Info("http server shutdown...")
250                 os.Exit(1)
251         }
252
253         //Wait until all go routines has exited
254         runtime.Goexit()
255
256         log.Warn("main routine exit")
257         log.Warn("server is stopping...")
258 }
259
260 // Simple alive check
261 func alive(w http.ResponseWriter, req *http.Request) {
262         //Alive check
263 }
264
265 // Get/Set logging level
266 func logging_level(w http.ResponseWriter, req *http.Request) {
267         vars := mux.Vars(req)
268         if level, ok := vars["level"]; ok {
269                 if req.Method == http.MethodPut {
270                         switch level {
271                         case "trace":
272                                 log.SetLevel(log.TraceLevel)
273                         case "debug":
274                                 log.SetLevel(log.DebugLevel)
275                         case "info":
276                                 log.SetLevel(log.InfoLevel)
277                         case "warn":
278                                 log.SetLevel(log.WarnLevel)
279                         case "error":
280                                 log.SetLevel(log.ErrorLevel)
281                         case "fatal":
282                                 log.SetLevel(log.FatalLevel)
283                         case "panic":
284                                 log.SetLevel(log.PanicLevel)
285                         default:
286                                 w.WriteHeader(http.StatusNotFound)
287                         }
288                 } else {
289                         w.WriteHeader(http.StatusMethodNotAllowed)
290                 }
291         } else {
292                 if req.Method == http.MethodGet {
293                         msg := "none"
294                         if log.IsLevelEnabled(log.PanicLevel) {
295                                 msg = "panic"
296                         } else if log.IsLevelEnabled(log.FatalLevel) {
297                                 msg = "fatal"
298                         } else if log.IsLevelEnabled(log.ErrorLevel) {
299                                 msg = "error"
300                         } else if log.IsLevelEnabled(log.WarnLevel) {
301                                 msg = "warn"
302                         } else if log.IsLevelEnabled(log.InfoLevel) {
303                                 msg = "info"
304                         } else if log.IsLevelEnabled(log.DebugLevel) {
305                                 msg = "debug"
306                         } else if log.IsLevelEnabled(log.TraceLevel) {
307                                 msg = "trace"
308                         }
309                         w.Header().Set("Content-Type", "application/text")
310                         w.Write([]byte(msg))
311                 } else {
312                         w.WriteHeader(http.StatusMethodNotAllowed)
313                 }
314         }
315 }
316
317 // Get app state
318 func status(w http.ResponseWriter, req *http.Request) {
319         if req.Method != http.MethodGet {
320                 w.WriteHeader(http.StatusMethodNotAllowed)
321                 return
322         }
323
324         _, err := w.Write([]byte(appStatus))
325         if err != nil {
326                 w.WriteHeader(http.StatusInternalServerError)
327                 log.Error("Cannot send statistics json")
328                 return
329         }
330 }
331
332 // producer statictics, all jobs
333 func statistics(w http.ResponseWriter, req *http.Request) {
334         if req.Method != http.MethodGet {
335                 w.WriteHeader(http.StatusMethodNotAllowed)
336                 return
337         }
338         m := make(map[string]interface{})
339         log.Debug("rapp statictics")
340
341         req.Header.Set("Content-Type", "application/json; charset=utf-8")
342         m["number-of-messages"] = strconv.Itoa(msg_count)
343         m["number-of-corrupted-messages"] = strconv.Itoa(msg_corrupted_count)
344         m["job id"] = jobid
345         m["group id"] = gid
346         m["client id"] = cid
347         m["kafka consumer type"] = consumer_type
348         m["server"] = bootstrapserver
349         m["topic"] = topic
350         m["messages per sec"] = msg_per_sec
351
352         json, err := json.Marshal(m)
353         if err != nil {
354                 w.WriteHeader(http.StatusInternalServerError)
355                 log.Error("Cannot marshal statistics json")
356                 return
357         }
358         _, err = w.Write(json)
359         if err != nil {
360                 w.WriteHeader(http.StatusInternalServerError)
361                 log.Error("Cannot send statistics json")
362                 return
363         }
364 }
365
366 func calc_average() {
367
368         for true {
369                 v := msg_count
370                 time.Sleep(60 * time.Second)
371                 msg_per_sec = (msg_count - v) / 60
372         }
373 }
374
375 func send_http_request(jsonData []byte, method string, url string, contentType string, accessToken string, alt_ok_response int, returnJson bool) (bool, map[string]interface{}) {
376
377         var req *http.Request
378         var err error
379         if jsonData != nil {
380                 req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
381                 if err != nil {
382                         log.Error("Cannot create http request method: ", method, " url: ", url)
383                         return false, nil
384                 }
385                 if contentType == "" {
386                         req.Header.Set("Content-Type", "application/json; charset=utf-8")
387                 } else {
388                         req.Header.Set("Content-Type", contentType)
389                 }
390         } else {
391                 req, err = http.NewRequest(method, url, nil)
392                 if err != nil {
393                         log.Error("Cannot create http request method: ", method, " url: ", url)
394                         return false, nil
395                 }
396         }
397         if jwt_file != "" || creds_service_url != "" {
398                 if accessToken != "" {
399                         req.Header.Set("Authorization", "Bearer "+accessToken)
400                 } else {
401                         log.Error("Cannot create http request for url: ", url, " - token missing")
402                         return false, nil
403                 }
404         }
405         log.Debug("Http request: ", req)
406         resp, err2 := httpclient.Do(req)
407         if err2 != nil {
408                 log.Error("Cannot send http request, method: ", method, "url: ", url)
409         } else {
410                 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
411
412                         if returnJson {
413                                 defer resp.Body.Close()
414                                 body, err3 := ioutil.ReadAll(resp.Body)
415                                 if err3 != nil {
416                                         log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
417                                         return false, nil
418                                 } else {
419                                         var responseJson map[string]interface{}
420                                         err := json.Unmarshal(body, &responseJson)
421                                         if err != nil {
422                                                 log.Error("Received msg not json? - cannot unmarshal")
423                                                 return false, nil
424                                         }
425                                         fmt.Println(string(body))
426                                         log.Debug("Accepted response code: ", resp.StatusCode)
427                                         return true, responseJson
428                                 }
429                         }
430
431                         log.Debug("Accepted response code: ", resp.StatusCode)
432                         return true, nil
433                 } else {
434                         if alt_ok_response != 0 && resp.StatusCode == alt_ok_response {
435
436                                 if returnJson {
437                                         defer resp.Body.Close()
438                                         body, err3 := ioutil.ReadAll(resp.Body)
439                                         if err3 != nil {
440                                                 log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
441                                                 return false, nil
442                                         } else {
443                                                 var responseJson map[string]interface{}
444                                                 err := json.Unmarshal(body, &responseJson)
445                                                 if err != nil {
446                                                         log.Error("Received msg not json? - cannot unmarshal")
447                                                         return false, nil
448                                                 }
449                                                 fmt.Println(string(body))
450                                                 log.Debug("Accepted alternative response code: ", resp.StatusCode)
451                                                 return true, responseJson
452                                         }
453                                 }
454                         } else {
455                                 log.Error("Bad response, method: ", method, " url: ", url, " resp: ", resp.StatusCode, " resp: ", resp)
456                         }
457                 }
458         }
459         return false, nil
460
461 }
462
463 func retrive_token_strimzi() {
464         log.Debug("Get token inline - strimzi comm")
465
466         conf := &clientcredentials.Config{
467                 ClientID:     creds_client_id,
468                 ClientSecret: creds_client_secret,
469                 TokenURL:     creds_service_url,
470         }
471         var modExpiry = time.Now()
472         ok := false
473         for !ok {
474                 token, err := conf.Token(context.Background())
475                 if err != nil {
476                         log.Warning("Cannot fetch access token: ", err, " - retrying ")
477                         time.Sleep(time.Second)
478                         continue
479                 }
480                 log.Debug("token: ", token)
481                 log.Debug("TokenValue: ", token.AccessToken)
482                 log.Debug("Expiration: ", token.Expiry)
483                 modExpiry = token.Expiry.Add(-time.Minute)
484                 log.Debug("Modified expiration: ", modExpiry)
485                 currentToken = token.AccessToken
486                 ok = true
487         }
488         log.Debug("Initial token ok")
489         diff := modExpiry.Sub(time.Now())
490         go func() {
491                 select {
492                 case <-time.After(diff):
493                         for !ok {
494                                 token, err := conf.Token(context.Background())
495                                 if err != nil {
496                                         log.Warning("Cannot fetch access token: ", err, " - retrying ")
497                                         time.Sleep(time.Second)
498                                         continue
499                                 }
500                                 log.Debug("token: ", token)
501                                 log.Debug("TokenValue: ", token.AccessToken)
502                                 log.Debug("Expiration: ", token.Expiry)
503                                 modExpiry = token.Expiry.Add(-time.Minute)
504                                 log.Debug("Modified expiration: ", modExpiry)
505                                 currentToken = token.AccessToken
506                                 ok = true
507                         }
508                         diff = modExpiry.Sub(time.Now())
509                 }
510         }()
511 }
512
513 func retrive_token(c *kafka.Consumer) {
514         log.Debug("Get token inline")
515         conf := &clientcredentials.Config{
516                 ClientID:     creds_client_id,
517                 ClientSecret: creds_client_secret,
518                 TokenURL:     creds_service_url,
519         }
520         token, err := conf.Token(context.Background())
521         if err != nil {
522                 log.Warning("Cannot fetch access token: ", err)
523                 c.SetOAuthBearerTokenFailure(err.Error())
524                 return
525         }
526         extensions := map[string]string{}
527         log.Debug("token: ", token)
528         log.Debug("TokenValue: ", token.AccessToken)
529         log.Debug("Expiration: ", token.Expiry)
530         t := token.Expiry.Add(-time.Minute)
531         log.Debug("Modified expiration: ", t)
532         oauthBearerToken := kafka.OAuthBearerToken{
533                 TokenValue: token.AccessToken,
534                 Expiration: t,
535                 Extensions: extensions,
536         }
537         log.Debug("Setting new token to consumer")
538         setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
539         currentToken = token.AccessToken
540         if setTokenError != nil {
541                 log.Warning("Cannot cannot set token in client: ", setTokenError)
542                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
543         }
544 }
545
546 func gzipWrite(w io.Writer, data *[]byte) error {
547         gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
548
549         if err1 != nil {
550                 return err1
551         }
552         defer gw.Close()
553         _, err2 := gw.Write(*data)
554         return err2
555 }
556
557 func read_bridge_messages() {
558
559         consumer_type = "unsecure strimzi bridge consumer"
560         if creds_service_url != "" {
561                 consumer_type = "accesstoken strimzi bridge consumer"
562         }
563         ok := false
564         log.Debug("Cleaning consumer "+cid+" in group: ", gid)
565         ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
566         if !ok {
567                 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - it may not exist - ok")
568         }
569         var bridge_base_url = ""
570         ok = false
571         json_str := "{\"name\": \"" + cid + "\", \"auto.offset.reset\": \"latest\",\"format\": \"json\"}"
572         for !ok {
573                 log.Debug("Creating consumer "+cid+" in group: ", gid)
574                 var respJson map[string]interface{}
575                 ok, respJson = send_http_request([]byte(json_str), http.MethodPost, bootstrapserver+"/consumers/"+gid, "application/vnd.kafka.v2+json", currentToken, 409, true) //409 if consumer already exists
576                 if ok {
577                         bridge_base_url = fmt.Sprintf("%s", respJson["base_uri"])
578                 } else {
579                         log.Info("Failed create consumer "+cid+" in group: ", gid, " - retrying")
580                         time.Sleep(time.Second)
581                 }
582         }
583
584         ok = false
585         json_str = "{\"topics\": [\"" + topic + "\"]}"
586
587         for !ok {
588                 log.Debug("Subscribing to topic: ", topic)
589                 ok, _ = send_http_request([]byte(json_str), http.MethodPost, bridge_base_url+"/subscription", "application/vnd.kafka.v2+json", currentToken, 0, false)
590                 if !ok {
591                         log.Info("Failed subscribe to topic: ", topic, " - retrying")
592                         time.Sleep(time.Second)
593                 }
594         }
595
596         for true {
597                 log.Debug("Reading messages on topic: ", topic)
598
599                 var req *http.Request
600                 var err error
601                 url := bridge_base_url + "/records"
602
603                 req, err = http.NewRequest(http.MethodGet, url, nil)
604                 if err != nil {
605                         log.Error("Cannot create http request method: GET, url: ", url)
606                         time.Sleep(1 * time.Second)
607                         continue
608                 }
609                 req.Header.Set("accept", "application/vnd.kafka.json.v2+json")
610
611                 if creds_service_url != "" {
612                         if currentToken != "" {
613                                 req.Header.Add("authorization", currentToken)
614                         } else {
615                                 log.Error("Cannot create http request for url: ", url, " - token missing")
616                                 time.Sleep(1 * time.Second)
617                                 continue
618                         }
619                 }
620
621                 values := req.URL.Query()
622                 values.Add("timeout", "10000")
623                 req.URL.RawQuery = values.Encode()
624
625                 log.Debug(req)
626
627                 resp, err2 := httpclient.Do(req)
628                 if err2 != nil {
629                         log.Error("Cannot send http request, method: GET, url: ", url)
630                         time.Sleep(1 * time.Second)
631                         continue
632                 } else {
633                         body, err := ioutil.ReadAll(resp.Body)
634                         resp.Body.Close()
635                         if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
636                                 log.Debug("Accepted response code: ", resp.StatusCode)
637
638                                 if err != nil {
639                                         log.Error("Cannot read body, method: GET, url: ", url, " resp: ", resp.StatusCode)
640                                 } else {
641                                         var responseJson []interface{}
642                                         err := json.Unmarshal(body, &responseJson)
643                                         if err != nil {
644                                                 log.Error("Received msg not json? - cannot unmarshal")
645                                                 msg_corrupted_count++
646                                         } else {
647                                                 if len(responseJson) == 0 {
648                                                         log.Debug("No message")
649                                                         continue
650                                                 }
651                                                 for _, item := range responseJson {
652                                                         j, err := json.MarshalIndent(item, "", " ")
653                                                         if err != nil {
654                                                                 log.Error("Message in array not json? - cannot unmarshal")
655                                                                 msg_corrupted_count++
656                                                         } else {
657                                                                 msg_count++
658                                                                 if log_payload != "" {
659                                                                         fmt.Println("Message: " + string(j))
660                                                                 }
661                                                         }
662                                                 }
663                                         }
664                                 }
665
666                                 log.Debug("Commiting message")
667                                 ok, _ = send_http_request(nil, http.MethodPost, bridge_base_url+"/offsets", "", currentToken, 0, false)
668                                 if !ok {
669                                         log.Info("Failed to commit message")
670                                 }
671
672                         } else {
673                                 log.Error("Bad response, method: GET, url: ", url, " resp: ", resp.StatusCode)
674                                 log.Error("Bad response, data: ", string(body))
675                         }
676                 }
677         }
678
679 }
680
681 func read_kafka_messages() {
682         var c *kafka.Consumer = nil
683         log.Info("Creating kafka consumer...")
684         var err error
685         for c == nil {
686                 if jwt_file == "" && creds_service_url == "" {
687                         if ssl_path == "" {
688                                 log.Info("unsecure consumer")
689                                 consumer_type = "kafka unsecure consumer"
690                                 c, err = kafka.NewConsumer(&kafka.ConfigMap{
691                                         "bootstrap.servers": bootstrapserver,
692                                         "group.id":          gid,
693                                         "client.id":         cid,
694                                         "auto.offset.reset": "latest",
695                                 })
696                         } else {
697                                 log.Info("ssl consumer")
698                                 consumer_type = "kafka ssl consumer"
699                                 c, err = kafka.NewConsumer(&kafka.ConfigMap{
700                                         "bootstrap.servers":        bootstrapserver,
701                                         "group.id":                 gid,
702                                         "client.id":                cid,
703                                         "auto.offset.reset":        "latest",
704                                         "security.protocol":        "SSL",
705                                         "ssl.key.location":         ssl_path + "/clt.key",
706                                         "ssl.certificate.location": ssl_path + "/clt.crt",
707                                         "ssl.ca.location":          ssl_path + "/ca.crt",
708                                 })
709                         }
710                 } else {
711                         if ssl_path != "" {
712                                 panic("SSL cannot be configued with JWT_FILE or RAPP_AUTH_SERVICE_URL")
713                         }
714                         log.Info("sasl consumer")
715                         consumer_type = "kafka sasl unsecure consumer"
716                         c, err = kafka.NewConsumer(&kafka.ConfigMap{
717                                 "bootstrap.servers": bootstrapserver,
718                                 "group.id":          gid,
719                                 "client.id":         cid,
720                                 "auto.offset.reset": "latest",
721                                 "sasl.mechanism":    "OAUTHBEARER",
722                                 "security.protocol": "SASL_PLAINTEXT",
723                         })
724                 }
725                 if err != nil {
726                         log.Warning("Cannot create kafka consumer - retrying, error: ", err)
727                         time.Sleep(1 * time.Second)
728                 }
729         }
730
731         log.Info("Creating kafka consumer - ok")
732         log.Info("Start subscribing to topic: ", topic)
733         topic_ok := false
734         for !topic_ok {
735                 err = c.SubscribeTopics([]string{topic}, nil)
736                 if err != nil {
737                         log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying --  error details: ", err)
738                 } else {
739                         log.Info("Topic reader subscribing on topic: ", topic)
740                         topic_ok = true
741                 }
742         }
743
744         fileModTime := time.Now()
745         for {
746                 if jwt_file != "" {
747                         fileInfo, err := os.Stat(jwt_file)
748                         if err == nil {
749                                 if fileModTime != fileInfo.ModTime() {
750                                         log.Debug("JWT file is updated")
751                                         fileModTime = fileInfo.ModTime()
752                                         fileBytes, err := ioutil.ReadFile(jwt_file)
753                                         if err != nil {
754                                                 log.Error("JWT file read error: ", err)
755                                         } else {
756                                                 fileString := string(fileBytes)
757                                                 log.Info("JWT: ", fileString)
758                                                 t := time.Now()
759                                                 t15 := time.Second * 300
760                                                 t = t.Add(t15)
761                                                 oauthBearerToken := kafka.OAuthBearerToken{
762                                                         TokenValue: fileString,
763                                                         Expiration: t,
764                                                 }
765                                                 log.Debug("Setting new token to consumer")
766                                                 setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
767                                                 if setTokenError != nil {
768                                                         log.Warning("Cannot cannot set token in client: ", setTokenError)
769                                                 }
770                                         }
771                                 } else {
772                                         log.Debug("JWT file not updated - OK")
773                                 }
774                         } else {
775                                 log.Error("JWT does not exist: ", err)
776                         }
777                 }
778                 ev := c.Poll(1000)
779                 if ev == nil {
780                         log.Debug(" Nothing to consume on topic: ", topic)
781                         continue
782                 }
783                 switch e := ev.(type) {
784                 case *kafka.Message:
785                         var pdata *[]byte = &e.Value
786                         if gzipped_data != "" {
787                                 var buf bytes.Buffer
788                                 err = gzipWrite(&buf, pdata)
789                                 if err != nil {
790                                         log.Warning("Cannot unzip data")
791                                         pdata = nil
792                                 } else {
793                                         *pdata = buf.Bytes()
794                                         fmt.Println("Unzipped data")
795                                 }
796                         }
797                         if pdata != nil {
798                                 buf := &bytes.Buffer{}
799
800                                 if err := json.Indent(buf, *pdata, "", " "); err != nil {
801                                         log.Warning("Received msg not json?")
802                                 } else {
803                                         fmt.Println(buf.String())
804                                         msg_count++
805                                         fmt.Println("Number of received json msgs: " + strconv.Itoa(msg_count))
806                                 }
807                         }
808                         c.Commit()
809                 case kafka.Error:
810                         fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
811
812                 case kafka.OAuthBearerTokenRefresh:
813                         if jwt_file == "" {
814                                 oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
815                                 fmt.Println(oart)
816                                 if !ok {
817                                         continue
818                                 }
819                                 retrive_token(c)
820                         }
821                 default:
822                         fmt.Printf("Ignored %v\n", e)
823                 }
824
825         }
826 }