DFC shall provide a bearer authorization token in HTTP
[nonrtric/plt/ranpm.git] / pm-rapp / main.go
1 //  ============LICENSE_START===============================================
2 //  Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 //  ========================================================================
4 //  Licensed under the Apache License, Version 2.0 (the "License");
5 //  you may not use this file except in compliance with the License.
6 //  You may obtain a copy of the License at
7 //
8 //       http://www.apache.org/licenses/LICENSE-2.0
9 //
10 //  Unless required by applicable law or agreed to in writing, software
11 //  distributed under the License is distributed on an "AS IS" BASIS,
12 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 //  See the License for the specific language governing permissions and
14 //  limitations under the License.
15 //  ============LICENSE_END=================================================
16 //
17
18 package main
19
20 import (
21         "bytes"
22         "compress/gzip"
23         "context"
24         "encoding/json"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "net/http"
29         "net/http/pprof"
30         "os"
31         "os/signal"
32         "runtime"
33         "strconv"
34         "strings"
35         "syscall"
36         "time"
37
38         "github.com/confluentinc/confluent-kafka-go/kafka"
39         "github.com/gorilla/mux"
40         jsoniter "github.com/json-iterator/go"
41         log "github.com/sirupsen/logrus"
42         "golang.org/x/oauth2/clientcredentials"
43 )
44
45 type JobDefinition struct {
46         InfoTypeID    string `json:"info_type_id"`
47         JobOwner      string `json:"job_owner"`
48         JobResultURI  string `json:"job_result_uri"`
49         JobDefinition struct {
50                 KafkaOutputTopic string          `json:"kafkaOutputTopic"`
51                 FilterType       string          `json:"filterType"`
52                 Filter           json.RawMessage `json:"filter"`
53                 DeliveryInfo     struct {
54                         Topic            string `json:"topic"`
55                         BootStrapServers string `json:"bootStrapServers"`
56                 } `json:"deliveryInfo"`
57         } `json:"job_definition"`
58 }
59
60 const jobdef = "/config/jobDefinition.json"
61
62 var rapp_id = os.Getenv("APPID")
63
64 var rapp_ns = os.Getenv("APPNS")
65
66 var bootstrapserver = os.Getenv("KAFKA_SERVER")
67
68 var topic = os.Getenv("TOPIC")
69
70 var ics_server = os.Getenv("ICS")
71
72 var jwt_file = os.Getenv("JWT_FILE")
73
74 var ssl_path = os.Getenv("SSLPATH")
75
76 var gzipped_data = os.Getenv("GZIP")
77
78 var log_payload = os.Getenv("LOG_PAYLOAD")
79
80 // These are optional - if rapp is fethcing the token instead of the side car
81 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
82 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
83 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
84 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
85
86 var gid = ""
87 var cid = "cid-0"
88
89 var msg_count int = 0
90 var msg_corrupted_count int = 0
91
92 var jobid = "<not-set>"
93 var consumer_type = "<not-set>"
94
95 var currentToken = ""
96
97 var appStatus = "INIT"
98
99 var msg_per_sec int = 0
100
101 var httpclient = &http.Client{}
102
103 // == Main ==//
104 func main() {
105
106         log.SetLevel(log.InfoLevel)
107         log.SetLevel(log.DebugLevel)
108
109         log.Info("Server starting...")
110
111         if creds_service_url != "" {
112                 log.Warn("Disabling jwt retrieval from side car")
113                 jwt_file = ""
114         }
115
116         if rapp_id == "" {
117                 log.Error("Env APPID not set")
118                 os.Exit(1)
119         }
120
121         if rapp_ns == "" {
122                 log.Error("Env APPNS not set")
123                 os.Exit(1)
124         }
125
126         if bootstrapserver == "" {
127                 log.Error("Env KAFKA_SERVER not set")
128                 os.Exit(1)
129         }
130
131         if topic == "" {
132                 log.Error("Env TOPIC not set")
133                 os.Exit(1)
134         }
135
136         if ics_server == "" {
137                 log.Error("Env ICS not set")
138                 os.Exit(1)
139         }
140
141         rtr := mux.NewRouter()
142         rtr.HandleFunc("/statistics", statistics)
143         rtr.HandleFunc("/status", status)
144         rtr.HandleFunc("/logging/{level}", logging_level)
145         rtr.HandleFunc("/logging", logging_level)
146         rtr.HandleFunc("/", alive)
147
148         //For perf/mem profiling
149         rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
150
151         http.Handle("/", rtr)
152
153         fileBytes, err := os.ReadFile(jobdef)
154         if err != nil {
155                 log.Error("Cannot read job defintion file: ", jobdef, err)
156                 os.Exit(1)
157         }
158         fmt.Println("FROM FILE")
159         fmt.Println(string(fileBytes))
160
161         job_json := JobDefinition{}
162         err = jsoniter.Unmarshal([]byte(fileBytes), &job_json)
163         if err != nil {
164                 log.Error("Cannot parse job defintion file: ", jobdef, err)
165                 os.Exit(1)
166         }
167         job_type := job_json.InfoTypeID
168         job_json.JobDefinition.KafkaOutputTopic = topic
169         job_json.JobDefinition.DeliveryInfo.Topic = topic
170         job_json.JobDefinition.DeliveryInfo.BootStrapServers = bootstrapserver
171
172         gid = "pm-rapp-" + job_type + "-" + rapp_id
173
174         jobid = "rapp-job-" + job_type + "-" + rapp_id
175
176         json_bytes, err := json.Marshal(job_json)
177         if err != nil {
178                 log.Error("Cannot marshal job json", err)
179                 os.Exit(1)
180         }
181
182         json_str := string(json_bytes)
183
184         if strings.HasPrefix(bootstrapserver, "http://") {
185                 if creds_service_url != "" {
186                         consumer_type = "accesstoken strimzi bridge consumer"
187                         retrive_token_strimzi()
188                 }
189         } else {
190                 go read_kafka_messages()
191         }
192
193         ok := false
194         if ics_server != "" {
195                 for !ok {
196                         log.Debug("Registring job: ", jobid, " json: ", json_str)
197                         ok, _ = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
198                         if !ok {
199                                 log.Info("Failed to register job: ", jobid, " - retrying")
200                                 time.Sleep(time.Second)
201                         }
202                 }
203         } else {
204                 log.Info("No job registered - read from topic only")
205         }
206         if strings.HasPrefix(bootstrapserver, "http://") {
207                 go read_bridge_messages()
208         }
209
210         go calc_average()
211
212         http_port := "80"
213         http_server := &http.Server{Addr: ":" + http_port, Handler: nil}
214
215         sigs := make(chan os.Signal, 1)
216         signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
217         go func() {
218                 fmt.Println("Setting handler for signal sigint and sigterm")
219                 sig := <-sigs
220                 appStatus = "TERMINATING"
221                 fmt.Printf("Received signal %s - application will terminate\n", sig)
222
223                 if strings.HasPrefix(bootstrapserver, "http://") {
224                         log.Debug("stopping strimzi consumer for job: ", jobid)
225                         ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
226                         if !ok {
227                                 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - retrying")
228                         }
229                 }
230
231                 ok := false
232                 if ics_server != "" {
233                         for !ok {
234                                 log.Debug("stopping job: ", jobid, " json: ", json_str)
235                                 ok, _ = send_http_request(nil, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
236                                 if !ok {
237                                         log.Info("Failed to stop job: ", jobid, " - retrying")
238                                         time.Sleep(time.Second)
239                                 }
240                         }
241                 }
242                 http_server.Shutdown(context.Background())
243         }()
244         appStatus = "RUNNING"
245         log.Info("Starting http service...")
246         err = http_server.ListenAndServe()
247         if err == http.ErrServerClosed { // graceful shutdown
248                 log.Info("http server shutdown...")
249                 os.Exit(1)
250         } else if err != nil {
251                 log.Error("http server error: ", err)
252                 log.Info("http server shutdown...")
253                 os.Exit(1)
254         }
255
256         //Wait until all go routines has exited
257         runtime.Goexit()
258
259         log.Warn("main routine exit")
260         log.Warn("server is stopping...")
261 }
262
263 // Simple alive check
264 func alive(w http.ResponseWriter, req *http.Request) {
265         //Alive check
266 }
267
268 // Get/Set logging level
269 func logging_level(w http.ResponseWriter, req *http.Request) {
270         vars := mux.Vars(req)
271         if level, ok := vars["level"]; ok {
272                 if req.Method == http.MethodPut {
273                         switch level {
274                         case "trace":
275                                 log.SetLevel(log.TraceLevel)
276                         case "debug":
277                                 log.SetLevel(log.DebugLevel)
278                         case "info":
279                                 log.SetLevel(log.InfoLevel)
280                         case "warn":
281                                 log.SetLevel(log.WarnLevel)
282                         case "error":
283                                 log.SetLevel(log.ErrorLevel)
284                         case "fatal":
285                                 log.SetLevel(log.FatalLevel)
286                         case "panic":
287                                 log.SetLevel(log.PanicLevel)
288                         default:
289                                 w.WriteHeader(http.StatusNotFound)
290                         }
291                 } else {
292                         w.WriteHeader(http.StatusMethodNotAllowed)
293                 }
294         } else {
295                 if req.Method == http.MethodGet {
296                         msg := "none"
297                         if log.IsLevelEnabled(log.PanicLevel) {
298                                 msg = "panic"
299                         } else if log.IsLevelEnabled(log.FatalLevel) {
300                                 msg = "fatal"
301                         } else if log.IsLevelEnabled(log.ErrorLevel) {
302                                 msg = "error"
303                         } else if log.IsLevelEnabled(log.WarnLevel) {
304                                 msg = "warn"
305                         } else if log.IsLevelEnabled(log.InfoLevel) {
306                                 msg = "info"
307                         } else if log.IsLevelEnabled(log.DebugLevel) {
308                                 msg = "debug"
309                         } else if log.IsLevelEnabled(log.TraceLevel) {
310                                 msg = "trace"
311                         }
312                         w.Header().Set("Content-Type", "application/text")
313                         w.Write([]byte(msg))
314                 } else {
315                         w.WriteHeader(http.StatusMethodNotAllowed)
316                 }
317         }
318 }
319
320 // Get app state
321 func status(w http.ResponseWriter, req *http.Request) {
322         if req.Method != http.MethodGet {
323                 w.WriteHeader(http.StatusMethodNotAllowed)
324                 return
325         }
326
327         _, err := w.Write([]byte(appStatus))
328         if err != nil {
329                 w.WriteHeader(http.StatusInternalServerError)
330                 log.Error("Cannot send statistics json")
331                 return
332         }
333 }
334
335 // producer statictics, all jobs
336 func statistics(w http.ResponseWriter, req *http.Request) {
337         if req.Method != http.MethodGet {
338                 w.WriteHeader(http.StatusMethodNotAllowed)
339                 return
340         }
341         m := make(map[string]interface{})
342         log.Debug("rapp statictics")
343
344         req.Header.Set("Content-Type", "application/json; charset=utf-8")
345         m["number-of-messages"] = strconv.Itoa(msg_count)
346         m["number-of-corrupted-messages"] = strconv.Itoa(msg_corrupted_count)
347         m["job id"] = jobid
348         m["group id"] = gid
349         m["client id"] = cid
350         m["kafka consumer type"] = consumer_type
351         m["server"] = bootstrapserver
352         m["topic"] = topic
353         m["messages per sec"] = msg_per_sec
354
355         json, err := json.Marshal(m)
356         if err != nil {
357                 w.WriteHeader(http.StatusInternalServerError)
358                 log.Error("Cannot marshal statistics json")
359                 return
360         }
361         _, err = w.Write(json)
362         if err != nil {
363                 w.WriteHeader(http.StatusInternalServerError)
364                 log.Error("Cannot send statistics json")
365                 return
366         }
367 }
368
369 func calc_average() {
370
371         for true {
372                 v := msg_count
373                 time.Sleep(60 * time.Second)
374                 msg_per_sec = (msg_count - v) / 60
375         }
376 }
377
378 func send_http_request(jsonData []byte, method string, url string, contentType string, accessToken string, alt_ok_response int, returnJson bool) (bool, map[string]interface{}) {
379
380         var req *http.Request
381         var err error
382         if jsonData != nil {
383                 req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
384                 if err != nil {
385                         log.Error("Cannot create http request method: ", method, " url: ", url)
386                         return false, nil
387                 }
388                 if contentType == "" {
389                         req.Header.Set("Content-Type", "application/json; charset=utf-8")
390                 } else {
391                         req.Header.Set("Content-Type", contentType)
392                 }
393         } else {
394                 req, err = http.NewRequest(method, url, nil)
395                 if err != nil {
396                         log.Error("Cannot create http request method: ", method, " url: ", url)
397                         return false, nil
398                 }
399         }
400         if jwt_file != "" || creds_service_url != "" {
401                 if accessToken != "" {
402                         req.Header.Set("Authorization", "Bearer "+accessToken)
403                 } else {
404                         log.Error("Cannot create http request for url: ", url, " - token missing")
405                         return false, nil
406                 }
407         }
408         log.Debug("Http request: ", req)
409         resp, err2 := httpclient.Do(req)
410         if err2 != nil {
411                 log.Error("Cannot send http request, method: ", method, "url: ", url)
412         } else {
413                 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
414
415                         if returnJson {
416                                 defer resp.Body.Close()
417                                 body, err3 := ioutil.ReadAll(resp.Body)
418                                 if err3 != nil {
419                                         log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
420                                         return false, nil
421                                 } else {
422                                         var responseJson map[string]interface{}
423                                         err := json.Unmarshal(body, &responseJson)
424                                         if err != nil {
425                                                 log.Error("Received msg not json? - cannot unmarshal")
426                                                 return false, nil
427                                         }
428                                         fmt.Println(string(body))
429                                         log.Debug("Accepted response code: ", resp.StatusCode)
430                                         return true, responseJson
431                                 }
432                         }
433
434                         log.Debug("Accepted response code: ", resp.StatusCode)
435                         return true, nil
436                 } else {
437                         if alt_ok_response != 0 && resp.StatusCode == alt_ok_response {
438
439                                 if returnJson {
440                                         defer resp.Body.Close()
441                                         body, err3 := ioutil.ReadAll(resp.Body)
442                                         if err3 != nil {
443                                                 log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
444                                                 return false, nil
445                                         } else {
446                                                 var responseJson map[string]interface{}
447                                                 err := json.Unmarshal(body, &responseJson)
448                                                 if err != nil {
449                                                         log.Error("Received msg not json? - cannot unmarshal")
450                                                         return false, nil
451                                                 }
452                                                 fmt.Println(string(body))
453                                                 log.Debug("Accepted alternative response code: ", resp.StatusCode)
454                                                 return true, responseJson
455                                         }
456                                 }
457                         } else {
458                                 log.Error("Bad response, method: ", method, " url: ", url, " resp: ", resp.StatusCode, " resp: ", resp)
459                         }
460                 }
461         }
462         return false, nil
463
464 }
465
466 func retrive_token_strimzi() {
467         log.Debug("Get token inline - strimzi comm")
468
469         conf := &clientcredentials.Config{
470                 ClientID:     creds_client_id,
471                 ClientSecret: creds_client_secret,
472                 TokenURL:     creds_service_url,
473         }
474         var modExpiry = time.Now()
475         ok := false
476         for !ok {
477                 token, err := conf.Token(context.Background())
478                 if err != nil {
479                         log.Warning("Cannot fetch access token: ", err, " - retrying ")
480                         time.Sleep(time.Second)
481                         continue
482                 }
483                 log.Debug("token: ", token)
484                 log.Debug("TokenValue: ", token.AccessToken)
485                 log.Debug("Expiration: ", token.Expiry)
486                 modExpiry = token.Expiry.Add(-time.Minute)
487                 log.Debug("Modified expiration: ", modExpiry)
488                 currentToken = token.AccessToken
489                 ok = true
490         }
491         log.Debug("Initial token ok")
492         diff := modExpiry.Sub(time.Now())
493         go func() {
494                 select {
495                 case <-time.After(diff):
496                         for !ok {
497                                 token, err := conf.Token(context.Background())
498                                 if err != nil {
499                                         log.Warning("Cannot fetch access token: ", err, " - retrying ")
500                                         time.Sleep(time.Second)
501                                         continue
502                                 }
503                                 log.Debug("token: ", token)
504                                 log.Debug("TokenValue: ", token.AccessToken)
505                                 log.Debug("Expiration: ", token.Expiry)
506                                 modExpiry = token.Expiry.Add(-time.Minute)
507                                 log.Debug("Modified expiration: ", modExpiry)
508                                 currentToken = token.AccessToken
509                                 ok = true
510                         }
511                         diff = modExpiry.Sub(time.Now())
512                 }
513         }()
514 }
515
516 func retrive_token(c *kafka.Consumer) {
517         log.Debug("Get token inline")
518         conf := &clientcredentials.Config{
519                 ClientID:     creds_client_id,
520                 ClientSecret: creds_client_secret,
521                 TokenURL:     creds_service_url,
522         }
523         token, err := conf.Token(context.Background())
524         if err != nil {
525                 log.Warning("Cannot fetch access token: ", err)
526                 c.SetOAuthBearerTokenFailure(err.Error())
527                 return
528         }
529         extensions := map[string]string{}
530         log.Debug("token: ", token)
531         log.Debug("TokenValue: ", token.AccessToken)
532         log.Debug("Expiration: ", token.Expiry)
533         t := token.Expiry.Add(-time.Minute)
534         log.Debug("Modified expiration: ", t)
535         oauthBearerToken := kafka.OAuthBearerToken{
536                 TokenValue: token.AccessToken,
537                 Expiration: t,
538                 Extensions: extensions,
539         }
540         log.Debug("Setting new token to consumer")
541         setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
542         currentToken = token.AccessToken
543         if setTokenError != nil {
544                 log.Warning("Cannot cannot set token in client: ", setTokenError)
545                 c.SetOAuthBearerTokenFailure(setTokenError.Error())
546         }
547 }
548
549 func gzipWrite(w io.Writer, data *[]byte) error {
550         gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
551
552         if err1 != nil {
553                 return err1
554         }
555         defer gw.Close()
556         _, err2 := gw.Write(*data)
557         return err2
558 }
559
560 func read_bridge_messages() {
561
562         consumer_type = "unsecure strimzi bridge consumer"
563         if creds_service_url != "" {
564                 consumer_type = "accesstoken strimzi bridge consumer"
565         }
566         ok := false
567         log.Debug("Cleaning consumer "+cid+" in group: ", gid)
568         ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
569         if !ok {
570                 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - it may not exist - ok")
571         }
572         var bridge_base_url = ""
573         ok = false
574         json_str := "{\"name\": \"" + cid + "\", \"auto.offset.reset\": \"latest\",\"format\": \"json\"}"
575         for !ok {
576                 log.Debug("Creating consumer "+cid+" in group: ", gid)
577                 var respJson map[string]interface{}
578                 ok, respJson = send_http_request([]byte(json_str), http.MethodPost, bootstrapserver+"/consumers/"+gid, "application/vnd.kafka.v2+json", currentToken, 409, true) //409 if consumer already exists
579                 if ok {
580                         bridge_base_url = fmt.Sprintf("%s", respJson["base_uri"])
581                 } else {
582                         log.Info("Failed create consumer "+cid+" in group: ", gid, " - retrying")
583                         time.Sleep(time.Second)
584                 }
585         }
586
587         ok = false
588         json_str = "{\"topics\": [\"" + topic + "\"]}"
589
590         for !ok {
591                 log.Debug("Subscribing to topic: ", topic)
592                 ok, _ = send_http_request([]byte(json_str), http.MethodPost, bridge_base_url+"/subscription", "application/vnd.kafka.v2+json", currentToken, 0, false)
593                 if !ok {
594                         log.Info("Failed subscribe to topic: ", topic, " - retrying")
595                         time.Sleep(time.Second)
596                 }
597         }
598
599         for true {
600                 log.Debug("Reading messages on topic: ", topic)
601
602                 var req *http.Request
603                 var err error
604                 url := bridge_base_url + "/records"
605
606                 req, err = http.NewRequest(http.MethodGet, url, nil)
607                 if err != nil {
608                         log.Error("Cannot create http request method: GET, url: ", url)
609                         time.Sleep(1 * time.Second)
610                         continue
611                 }
612                 req.Header.Set("accept", "application/vnd.kafka.json.v2+json")
613
614                 if creds_service_url != "" {
615                         if currentToken != "" {
616                                 req.Header.Add("authorization", currentToken)
617                         } else {
618                                 log.Error("Cannot create http request for url: ", url, " - token missing")
619                                 time.Sleep(1 * time.Second)
620                                 continue
621                         }
622                 }
623
624                 values := req.URL.Query()
625                 values.Add("timeout", "10000")
626                 req.URL.RawQuery = values.Encode()
627
628                 log.Debug(req)
629
630                 resp, err2 := httpclient.Do(req)
631                 if err2 != nil {
632                         log.Error("Cannot send http request, method: GET, url: ", url)
633                         time.Sleep(1 * time.Second)
634                         continue
635                 } else {
636                         body, err := ioutil.ReadAll(resp.Body)
637                         resp.Body.Close()
638                         if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
639                                 log.Debug("Accepted response code: ", resp.StatusCode)
640
641                                 if err != nil {
642                                         log.Error("Cannot read body, method: GET, url: ", url, " resp: ", resp.StatusCode)
643                                 } else {
644                                         var responseJson []interface{}
645                                         err := json.Unmarshal(body, &responseJson)
646                                         if err != nil {
647                                                 log.Error("Received msg not json? - cannot unmarshal")
648                                                 msg_corrupted_count++
649                                         } else {
650                                                 if len(responseJson) == 0 {
651                                                         log.Debug("No message")
652                                                         continue
653                                                 }
654                                                 for _, item := range responseJson {
655                                                         j, err := json.MarshalIndent(item, "", " ")
656                                                         if err != nil {
657                                                                 log.Error("Message in array not json? - cannot unmarshal")
658                                                                 msg_corrupted_count++
659                                                         } else {
660                                                                 msg_count++
661                                                                 if log_payload != "" {
662                                                                         fmt.Println("Message: " + string(j))
663                                                                 }
664                                                         }
665                                                 }
666                                         }
667                                 }
668
669                                 log.Debug("Commiting message")
670                                 ok, _ = send_http_request(nil, http.MethodPost, bridge_base_url+"/offsets", "", currentToken, 0, false)
671                                 if !ok {
672                                         log.Info("Failed to commit message")
673                                 }
674
675                         } else {
676                                 log.Error("Bad response, method: GET, url: ", url, " resp: ", resp.StatusCode)
677                                 log.Error("Bad response, data: ", string(body))
678                         }
679                 }
680         }
681
682 }
683
684 func read_kafka_messages() {
685         var c *kafka.Consumer = nil
686         log.Info("Creating kafka consumer...")
687         var err error
688         for c == nil {
689                 if jwt_file == "" && creds_service_url == "" {
690                         if ssl_path == "" {
691                                 log.Info("unsecure consumer")
692                                 consumer_type = "kafka unsecure consumer"
693                                 c, err = kafka.NewConsumer(&kafka.ConfigMap{
694                                         "bootstrap.servers": bootstrapserver,
695                                         "group.id":          gid,
696                                         "client.id":         cid,
697                                         "auto.offset.reset": "latest",
698                                 })
699                         } else {
700                                 log.Info("ssl consumer")
701                                 consumer_type = "kafka ssl consumer"
702                                 c, err = kafka.NewConsumer(&kafka.ConfigMap{
703                                         "bootstrap.servers":        bootstrapserver,
704                                         "group.id":                 gid,
705                                         "client.id":                cid,
706                                         "auto.offset.reset":        "latest",
707                                         "security.protocol":        "SSL",
708                                         "ssl.key.location":         ssl_path + "/clt.key",
709                                         "ssl.certificate.location": ssl_path + "/clt.crt",
710                                         "ssl.ca.location":          ssl_path + "/ca.crt",
711                                 })
712                         }
713                 } else {
714                         if ssl_path != "" {
715                                 panic("SSL cannot be configued with JWT_FILE or RAPP_AUTH_SERVICE_URL")
716                         }
717                         log.Info("sasl consumer")
718                         consumer_type = "kafka sasl unsecure consumer"
719                         c, err = kafka.NewConsumer(&kafka.ConfigMap{
720                                 "bootstrap.servers": bootstrapserver,
721                                 "group.id":          gid,
722                                 "client.id":         cid,
723                                 "auto.offset.reset": "latest",
724                                 "sasl.mechanism":    "OAUTHBEARER",
725                                 "security.protocol": "SASL_PLAINTEXT",
726                         })
727                 }
728                 if err != nil {
729                         log.Warning("Cannot create kafka consumer - retrying, error: ", err)
730                         time.Sleep(1 * time.Second)
731                 }
732         }
733
734         log.Info("Creating kafka consumer - ok")
735         log.Info("Start subscribing to topic: ", topic)
736         topic_ok := false
737         for !topic_ok {
738                 err = c.SubscribeTopics([]string{topic}, nil)
739                 if err != nil {
740                         log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying --  error details: ", err)
741                 } else {
742                         log.Info("Topic reader subscribing on topic: ", topic)
743                         topic_ok = true
744                 }
745         }
746
747         fileModTime := time.Now()
748         for {
749                 if jwt_file != "" {
750                         fileInfo, err := os.Stat(jwt_file)
751                         if err == nil {
752                                 if fileModTime != fileInfo.ModTime() {
753                                         log.Debug("JWT file is updated")
754                                         fileModTime = fileInfo.ModTime()
755                                         fileBytes, err := ioutil.ReadFile(jwt_file)
756                                         if err != nil {
757                                                 log.Error("JWT file read error: ", err)
758                                         } else {
759                                                 fileString := string(fileBytes)
760                                                 log.Info("JWT: ", fileString)
761                                                 t := time.Now()
762                                                 t15 := time.Second * 300
763                                                 t = t.Add(t15)
764                                                 oauthBearerToken := kafka.OAuthBearerToken{
765                                                         TokenValue: fileString,
766                                                         Expiration: t,
767                                                 }
768                                                 log.Debug("Setting new token to consumer")
769                                                 setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
770                                                 if setTokenError != nil {
771                                                         log.Warning("Cannot cannot set token in client: ", setTokenError)
772                                                 }
773                                         }
774                                 } else {
775                                         log.Debug("JWT file not updated - OK")
776                                 }
777                         } else {
778                                 log.Error("JWT does not exist: ", err)
779                         }
780                 }
781                 ev := c.Poll(1000)
782                 if ev == nil {
783                         log.Debug(" Nothing to consume on topic: ", topic)
784                         continue
785                 }
786                 switch e := ev.(type) {
787                 case *kafka.Message:
788                         var pdata *[]byte = &e.Value
789                         if gzipped_data != "" {
790                                 var buf bytes.Buffer
791                                 err = gzipWrite(&buf, pdata)
792                                 if err != nil {
793                                         log.Warning("Cannot unzip data")
794                                         pdata = nil
795                                 } else {
796                                         *pdata = buf.Bytes()
797                                         fmt.Println("Unzipped data")
798                                 }
799                         }
800                         if pdata != nil {
801                                 buf := &bytes.Buffer{}
802
803                                 if err := json.Indent(buf, *pdata, "", " "); err != nil {
804                                         log.Warning("Received msg not json?")
805                                 } else {
806                                         fmt.Println(buf.String())
807                                         msg_count++
808                                         fmt.Println("Number of received json msgs: " + strconv.Itoa(msg_count))
809                                 }
810                         }
811                         c.Commit()
812                 case kafka.Error:
813                         fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
814
815                 case kafka.OAuthBearerTokenRefresh:
816                         if jwt_file == "" {
817                                 oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
818                                 fmt.Println(oart)
819                                 if !ok {
820                                         continue
821                                 }
822                                 retrive_token(c)
823                         }
824                 default:
825                         fmt.Printf("Ignored %v\n", e)
826                 }
827
828         }
829 }