1 // ============LICENSE_START===============================================
2 // Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 // ========================================================================
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 // ============LICENSE_END=================================================
38 "github.com/confluentinc/confluent-kafka-go/kafka"
39 "github.com/gorilla/mux"
40 jsoniter "github.com/json-iterator/go"
41 log "github.com/sirupsen/logrus"
42 "golang.org/x/oauth2/clientcredentials"
45 type JobDefinition struct {
46 InfoTypeID string `json:"info_type_id"`
47 JobOwner string `json:"job_owner"`
48 StatusNotificationURI string `json:"status_notification_uri"`
49 JobDefinition struct {
50 Filter json.RawMessage `json:"filter"`
52 Topic string `json:"topic"`
53 BootStrapServers string `json:"bootStrapServers"`
54 } `json:"deliveryInfo"`
55 } `json:"job_definition"`
58 const jobdef = "/config/jobDefinition.json"
60 var rapp_id = os.Getenv("APPID")
62 var rapp_ns = os.Getenv("APPNS")
64 var bootstrapserver = os.Getenv("KAFKA_SERVER")
66 var topic = os.Getenv("TOPIC")
68 var ics_server = os.Getenv("ICS")
70 var jwt_file = os.Getenv("JWT_FILE")
72 var ssl_path = os.Getenv("SSLPATH")
74 var gzipped_data = os.Getenv("GZIP")
76 var log_payload = os.Getenv("LOG_PAYLOAD")
78 // These are optional - if rapp is fethcing the token instead of the side car
79 var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
80 var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
81 var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
82 var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
88 var msg_corrupted_count int = 0
90 var jobid = "<not-set>"
91 var consumer_type = "<not-set>"
95 var appStatus = "INIT"
97 var msg_per_sec int = 0
99 var httpclient = &http.Client{}
104 log.SetLevel(log.InfoLevel)
105 log.SetLevel(log.DebugLevel)
107 log.Info("Server starting...")
109 if creds_service_url != "" {
110 log.Warn("Disabling jwt retrieval from side car")
115 log.Error("Env APPID not set")
120 log.Error("Env APPNS not set")
124 if bootstrapserver == "" {
125 log.Error("Env KAFKA_SERVER not set")
130 log.Error("Env TOPIC not set")
134 if ics_server == "" {
135 log.Error("Env ICS not set")
139 rtr := mux.NewRouter()
140 rtr.HandleFunc("/statistics", statistics)
141 rtr.HandleFunc("/status", status)
142 rtr.HandleFunc("/logging/{level}", logging_level)
143 rtr.HandleFunc("/logging", logging_level)
144 rtr.HandleFunc("/", alive)
146 //For perf/mem profiling
147 rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
149 http.Handle("/", rtr)
151 fileBytes, err := os.ReadFile(jobdef)
153 log.Error("Cannot read job defintion file: ", jobdef, err)
156 fmt.Println("FROM FILE")
157 fmt.Println(string(fileBytes))
159 job_json := JobDefinition{}
160 err = jsoniter.Unmarshal([]byte(fileBytes), &job_json)
162 log.Error("Cannot parse job defintion file: ", jobdef, err)
165 job_type := job_json.InfoTypeID
166 job_json.JobDefinition.DeliveryInfo.Topic = topic
167 job_json.JobDefinition.DeliveryInfo.BootStrapServers = bootstrapserver
169 gid = "pm-rapp-" + job_type + "-" + rapp_id
171 jobid = "rapp-job-" + job_type + "-" + rapp_id
173 json_bytes, err := json.Marshal(job_json)
175 log.Error("Cannot marshal job json", err)
179 json_str := string(json_bytes)
181 if strings.HasPrefix(bootstrapserver, "http://") {
182 if creds_service_url != "" {
183 consumer_type = "accesstoken strimzi bridge consumer"
184 retrive_token_strimzi()
187 go read_kafka_messages()
191 if ics_server != "" {
193 log.Debug("Registring job: ", jobid, " json: ", json_str)
194 ok, _ = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
196 log.Info("Failed to register job: ", jobid, " - retrying")
197 time.Sleep(time.Second)
201 log.Info("No job registered - read from topic only")
203 if strings.HasPrefix(bootstrapserver, "http://") {
204 go read_bridge_messages()
210 http_server := &http.Server{Addr: ":" + http_port, Handler: nil}
212 sigs := make(chan os.Signal, 1)
213 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
215 fmt.Println("Setting handler for signal sigint and sigterm")
217 appStatus = "TERMINATING"
218 fmt.Printf("Received signal %s - application will terminate\n", sig)
220 if strings.HasPrefix(bootstrapserver, "http://") {
221 log.Debug("stopping strimzi consumer for job: ", jobid)
222 ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
224 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - retrying")
229 if ics_server != "" {
231 log.Debug("stopping job: ", jobid, " json: ", json_str)
232 ok, _ = send_http_request(nil, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
234 log.Info("Failed to stop job: ", jobid, " - retrying")
235 time.Sleep(time.Second)
239 http_server.Shutdown(context.Background())
241 appStatus = "RUNNING"
242 log.Info("Starting http service...")
243 err = http_server.ListenAndServe()
244 if err == http.ErrServerClosed { // graceful shutdown
245 log.Info("http server shutdown...")
247 } else if err != nil {
248 log.Error("http server error: ", err)
249 log.Info("http server shutdown...")
253 //Wait until all go routines has exited
256 log.Warn("main routine exit")
257 log.Warn("server is stopping...")
260 // Simple alive check
261 func alive(w http.ResponseWriter, req *http.Request) {
265 // Get/Set logging level
266 func logging_level(w http.ResponseWriter, req *http.Request) {
267 vars := mux.Vars(req)
268 if level, ok := vars["level"]; ok {
269 if req.Method == http.MethodPut {
272 log.SetLevel(log.TraceLevel)
274 log.SetLevel(log.DebugLevel)
276 log.SetLevel(log.InfoLevel)
278 log.SetLevel(log.WarnLevel)
280 log.SetLevel(log.ErrorLevel)
282 log.SetLevel(log.FatalLevel)
284 log.SetLevel(log.PanicLevel)
286 w.WriteHeader(http.StatusNotFound)
289 w.WriteHeader(http.StatusMethodNotAllowed)
292 if req.Method == http.MethodGet {
294 if log.IsLevelEnabled(log.PanicLevel) {
296 } else if log.IsLevelEnabled(log.FatalLevel) {
298 } else if log.IsLevelEnabled(log.ErrorLevel) {
300 } else if log.IsLevelEnabled(log.WarnLevel) {
302 } else if log.IsLevelEnabled(log.InfoLevel) {
304 } else if log.IsLevelEnabled(log.DebugLevel) {
306 } else if log.IsLevelEnabled(log.TraceLevel) {
309 w.Header().Set("Content-Type", "application/text")
312 w.WriteHeader(http.StatusMethodNotAllowed)
318 func status(w http.ResponseWriter, req *http.Request) {
319 if req.Method != http.MethodGet {
320 w.WriteHeader(http.StatusMethodNotAllowed)
324 _, err := w.Write([]byte(appStatus))
326 w.WriteHeader(http.StatusInternalServerError)
327 log.Error("Cannot send statistics json")
332 // producer statictics, all jobs
333 func statistics(w http.ResponseWriter, req *http.Request) {
334 if req.Method != http.MethodGet {
335 w.WriteHeader(http.StatusMethodNotAllowed)
338 m := make(map[string]interface{})
339 log.Debug("rapp statictics")
341 req.Header.Set("Content-Type", "application/json; charset=utf-8")
342 m["number-of-messages"] = strconv.Itoa(msg_count)
343 m["number-of-corrupted-messages"] = strconv.Itoa(msg_corrupted_count)
347 m["kafka consumer type"] = consumer_type
348 m["server"] = bootstrapserver
350 m["messages per sec"] = msg_per_sec
352 json, err := json.Marshal(m)
354 w.WriteHeader(http.StatusInternalServerError)
355 log.Error("Cannot marshal statistics json")
358 _, err = w.Write(json)
360 w.WriteHeader(http.StatusInternalServerError)
361 log.Error("Cannot send statistics json")
366 func calc_average() {
370 time.Sleep(60 * time.Second)
371 msg_per_sec = (msg_count - v) / 60
375 func send_http_request(jsonData []byte, method string, url string, contentType string, accessToken string, alt_ok_response int, returnJson bool) (bool, map[string]interface{}) {
377 var req *http.Request
380 req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
382 log.Error("Cannot create http request method: ", method, " url: ", url)
385 if contentType == "" {
386 req.Header.Set("Content-Type", "application/json; charset=utf-8")
388 req.Header.Set("Content-Type", contentType)
391 req, err = http.NewRequest(method, url, nil)
393 log.Error("Cannot create http request method: ", method, " url: ", url)
397 if jwt_file != "" || creds_service_url != "" {
398 if accessToken != "" {
399 req.Header.Set("Authorization", "Bearer "+accessToken)
401 log.Error("Cannot create http request for url: ", url, " - token missing")
405 log.Debug("Http request: ", req)
406 resp, err2 := httpclient.Do(req)
408 log.Error("Cannot send http request, method: ", method, "url: ", url)
410 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
413 defer resp.Body.Close()
414 body, err3 := ioutil.ReadAll(resp.Body)
416 log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
419 var responseJson map[string]interface{}
420 err := json.Unmarshal(body, &responseJson)
422 log.Error("Received msg not json? - cannot unmarshal")
425 fmt.Println(string(body))
426 log.Debug("Accepted response code: ", resp.StatusCode)
427 return true, responseJson
431 log.Debug("Accepted response code: ", resp.StatusCode)
434 if alt_ok_response != 0 && resp.StatusCode == alt_ok_response {
437 defer resp.Body.Close()
438 body, err3 := ioutil.ReadAll(resp.Body)
440 log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
443 var responseJson map[string]interface{}
444 err := json.Unmarshal(body, &responseJson)
446 log.Error("Received msg not json? - cannot unmarshal")
449 fmt.Println(string(body))
450 log.Debug("Accepted alternative response code: ", resp.StatusCode)
451 return true, responseJson
455 log.Error("Bad response, method: ", method, " url: ", url, " resp: ", resp.StatusCode, " resp: ", resp)
463 func retrive_token_strimzi() {
464 log.Debug("Get token inline - strimzi comm")
466 conf := &clientcredentials.Config{
467 ClientID: creds_client_id,
468 ClientSecret: creds_client_secret,
469 TokenURL: creds_service_url,
471 var modExpiry = time.Now()
474 token, err := conf.Token(context.Background())
476 log.Warning("Cannot fetch access token: ", err, " - retrying ")
477 time.Sleep(time.Second)
480 log.Debug("token: ", token)
481 log.Debug("TokenValue: ", token.AccessToken)
482 log.Debug("Expiration: ", token.Expiry)
483 modExpiry = token.Expiry.Add(-time.Minute)
484 log.Debug("Modified expiration: ", modExpiry)
485 currentToken = token.AccessToken
488 log.Debug("Initial token ok")
489 diff := modExpiry.Sub(time.Now())
492 case <-time.After(diff):
494 token, err := conf.Token(context.Background())
496 log.Warning("Cannot fetch access token: ", err, " - retrying ")
497 time.Sleep(time.Second)
500 log.Debug("token: ", token)
501 log.Debug("TokenValue: ", token.AccessToken)
502 log.Debug("Expiration: ", token.Expiry)
503 modExpiry = token.Expiry.Add(-time.Minute)
504 log.Debug("Modified expiration: ", modExpiry)
505 currentToken = token.AccessToken
508 diff = modExpiry.Sub(time.Now())
513 func retrive_token(c *kafka.Consumer) {
514 log.Debug("Get token inline")
515 conf := &clientcredentials.Config{
516 ClientID: creds_client_id,
517 ClientSecret: creds_client_secret,
518 TokenURL: creds_service_url,
520 token, err := conf.Token(context.Background())
522 log.Warning("Cannot fetch access token: ", err)
523 c.SetOAuthBearerTokenFailure(err.Error())
526 extensions := map[string]string{}
527 log.Debug("token: ", token)
528 log.Debug("TokenValue: ", token.AccessToken)
529 log.Debug("Expiration: ", token.Expiry)
530 t := token.Expiry.Add(-time.Minute)
531 log.Debug("Modified expiration: ", t)
532 oauthBearerToken := kafka.OAuthBearerToken{
533 TokenValue: token.AccessToken,
535 Extensions: extensions,
537 log.Debug("Setting new token to consumer")
538 setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
539 currentToken = token.AccessToken
540 if setTokenError != nil {
541 log.Warning("Cannot cannot set token in client: ", setTokenError)
542 c.SetOAuthBearerTokenFailure(setTokenError.Error())
546 func gzipWrite(w io.Writer, data *[]byte) error {
547 gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
553 _, err2 := gw.Write(*data)
557 func read_bridge_messages() {
559 consumer_type = "unsecure strimzi bridge consumer"
560 if creds_service_url != "" {
561 consumer_type = "accesstoken strimzi bridge consumer"
564 log.Debug("Cleaning consumer "+cid+" in group: ", gid)
565 ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
567 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - it may not exist - ok")
569 var bridge_base_url = ""
571 json_str := "{\"name\": \"" + cid + "\", \"auto.offset.reset\": \"latest\",\"format\": \"json\"}"
573 log.Debug("Creating consumer "+cid+" in group: ", gid)
574 var respJson map[string]interface{}
575 ok, respJson = send_http_request([]byte(json_str), http.MethodPost, bootstrapserver+"/consumers/"+gid, "application/vnd.kafka.v2+json", currentToken, 409, true) //409 if consumer already exists
577 bridge_base_url = fmt.Sprintf("%s", respJson["base_uri"])
579 log.Info("Failed create consumer "+cid+" in group: ", gid, " - retrying")
580 time.Sleep(time.Second)
585 json_str = "{\"topics\": [\"" + topic + "\"]}"
588 log.Debug("Subscribing to topic: ", topic)
589 ok, _ = send_http_request([]byte(json_str), http.MethodPost, bridge_base_url+"/subscription", "application/vnd.kafka.v2+json", currentToken, 0, false)
591 log.Info("Failed subscribe to topic: ", topic, " - retrying")
592 time.Sleep(time.Second)
597 log.Debug("Reading messages on topic: ", topic)
599 var req *http.Request
601 url := bridge_base_url + "/records"
603 req, err = http.NewRequest(http.MethodGet, url, nil)
605 log.Error("Cannot create http request method: GET, url: ", url)
606 time.Sleep(1 * time.Second)
609 req.Header.Set("accept", "application/vnd.kafka.json.v2+json")
611 if creds_service_url != "" {
612 if currentToken != "" {
613 req.Header.Add("authorization", currentToken)
615 log.Error("Cannot create http request for url: ", url, " - token missing")
616 time.Sleep(1 * time.Second)
621 values := req.URL.Query()
622 values.Add("timeout", "10000")
623 req.URL.RawQuery = values.Encode()
627 resp, err2 := httpclient.Do(req)
629 log.Error("Cannot send http request, method: GET, url: ", url)
630 time.Sleep(1 * time.Second)
633 body, err := ioutil.ReadAll(resp.Body)
635 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
636 log.Debug("Accepted response code: ", resp.StatusCode)
639 log.Error("Cannot read body, method: GET, url: ", url, " resp: ", resp.StatusCode)
641 var responseJson []interface{}
642 err := json.Unmarshal(body, &responseJson)
644 log.Error("Received msg not json? - cannot unmarshal")
645 msg_corrupted_count++
647 if len(responseJson) == 0 {
648 log.Debug("No message")
651 for _, item := range responseJson {
652 j, err := json.MarshalIndent(item, "", " ")
654 log.Error("Message in array not json? - cannot unmarshal")
655 msg_corrupted_count++
658 if log_payload != "" {
659 fmt.Println("Message: " + string(j))
666 log.Debug("Commiting message")
667 ok, _ = send_http_request(nil, http.MethodPost, bridge_base_url+"/offsets", "", currentToken, 0, false)
669 log.Info("Failed to commit message")
673 log.Error("Bad response, method: GET, url: ", url, " resp: ", resp.StatusCode)
674 log.Error("Bad response, data: ", string(body))
681 func read_kafka_messages() {
682 var c *kafka.Consumer = nil
683 log.Info("Creating kafka consumer...")
686 if jwt_file == "" && creds_service_url == "" {
688 log.Info("unsecure consumer")
689 consumer_type = "kafka unsecure consumer"
690 c, err = kafka.NewConsumer(&kafka.ConfigMap{
691 "bootstrap.servers": bootstrapserver,
694 "auto.offset.reset": "latest",
697 log.Info("ssl consumer")
698 consumer_type = "kafka ssl consumer"
699 c, err = kafka.NewConsumer(&kafka.ConfigMap{
700 "bootstrap.servers": bootstrapserver,
703 "auto.offset.reset": "latest",
704 "security.protocol": "SSL",
705 "ssl.key.location": ssl_path + "/clt.key",
706 "ssl.certificate.location": ssl_path + "/clt.crt",
707 "ssl.ca.location": ssl_path + "/ca.crt",
712 panic("SSL cannot be configued with JWT_FILE or RAPP_AUTH_SERVICE_URL")
714 log.Info("sasl consumer")
715 consumer_type = "kafka sasl unsecure consumer"
716 c, err = kafka.NewConsumer(&kafka.ConfigMap{
717 "bootstrap.servers": bootstrapserver,
720 "auto.offset.reset": "latest",
721 "sasl.mechanism": "OAUTHBEARER",
722 "security.protocol": "SASL_PLAINTEXT",
726 log.Warning("Cannot create kafka consumer - retrying, error: ", err)
727 time.Sleep(1 * time.Second)
731 log.Info("Creating kafka consumer - ok")
732 log.Info("Start subscribing to topic: ", topic)
735 err = c.SubscribeTopics([]string{topic}, nil)
737 log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying -- error details: ", err)
739 log.Info("Topic reader subscribing on topic: ", topic)
744 fileModTime := time.Now()
747 fileInfo, err := os.Stat(jwt_file)
749 if fileModTime != fileInfo.ModTime() {
750 log.Debug("JWT file is updated")
751 fileModTime = fileInfo.ModTime()
752 fileBytes, err := ioutil.ReadFile(jwt_file)
754 log.Error("JWT file read error: ", err)
756 fileString := string(fileBytes)
757 log.Info("JWT: ", fileString)
759 t15 := time.Second * 300
761 oauthBearerToken := kafka.OAuthBearerToken{
762 TokenValue: fileString,
765 log.Debug("Setting new token to consumer")
766 setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
767 if setTokenError != nil {
768 log.Warning("Cannot cannot set token in client: ", setTokenError)
772 log.Debug("JWT file not updated - OK")
775 log.Error("JWT does not exist: ", err)
780 log.Debug(" Nothing to consume on topic: ", topic)
783 switch e := ev.(type) {
785 var pdata *[]byte = &e.Value
786 if gzipped_data != "" {
788 err = gzipWrite(&buf, pdata)
790 log.Warning("Cannot unzip data")
794 fmt.Println("Unzipped data")
798 buf := &bytes.Buffer{}
800 if err := json.Indent(buf, *pdata, "", " "); err != nil {
801 log.Warning("Received msg not json?")
803 fmt.Println(buf.String())
805 fmt.Println("Number of received json msgs: " + strconv.Itoa(msg_count))
810 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
812 case kafka.OAuthBearerTokenRefresh:
814 oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
822 fmt.Printf("Ignored %v\n", e)