Fix distribution of Kafka messages
[nonrtric.git] / dmaap-mediator-producer / internal / restclient / HTTPClient.go
index c6eb24c..a7582c2 100644 (file)
@@ -22,11 +22,21 @@ package restclient
 
 import (
        "bytes"
+       "crypto/tls"
        "fmt"
        "io"
+       "math"
        "net/http"
+       "net/url"
+       "time"
+
+       "github.com/hashicorp/go-retryablehttp"
+       log "github.com/sirupsen/logrus"
 )
 
+const ContentTypeJSON = "application/json"
+const ContentTypePlain = "text/plain"
+
 // HTTPClient interface
 type HTTPClient interface {
        Get(url string) (*http.Response, error)
@@ -43,39 +53,35 @@ func (pe RequestError) Error() string {
        return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
 }
 
-var (
-       Client HTTPClient
-)
-
-func init() {
-       Client = &http.Client{}
-}
-
-func Get(url string) ([]byte, error) {
-       if response, err := Client.Get(url); err == nil {
-               defer response.Body.Close()
-               if responseData, err := io.ReadAll(response.Body); err == nil {
-                       if isResponseSuccess(response.StatusCode) {
+func Get(url string, client HTTPClient) ([]byte, error) {
+       if response, err := client.Get(url); err == nil {
+               if isResponseSuccess(response.StatusCode) {
+                       defer response.Body.Close()
+                       if responseData, err := io.ReadAll(response.Body); err == nil {
                                return responseData, nil
                        } else {
-                               requestError := RequestError{
-                                       StatusCode: response.StatusCode,
-                                       Body:       responseData,
-                               }
-                               return nil, requestError
+                               return nil, err
                        }
                } else {
-                       return nil, err
+                       return nil, getRequestError(response)
                }
        } else {
                return nil, err
        }
 }
 
-func Put(url string, body []byte) error {
-       if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil {
-               req.Header.Set("Content-Type", "application/json; charset=utf-8")
-               if response, respErr := Client.Do(req); respErr == nil {
+func Put(url string, body []byte, client HTTPClient) error {
+       return do(http.MethodPut, url, body, ContentTypeJSON, client)
+}
+
+func Post(url string, body []byte, contentType string, client HTTPClient) error {
+       return do(http.MethodPost, url, body, contentType, client)
+}
+
+func do(method string, url string, body []byte, contentType string, client HTTPClient) error {
+       if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
+               req.Header.Set("Content-Type", contentType)
+               if response, respErr := client.Do(req); respErr == nil {
                        if isResponseSuccess(response.StatusCode) {
                                return nil
                        } else {
@@ -94,6 +100,7 @@ func isResponseSuccess(statusCode int) bool {
 }
 
 func getRequestError(response *http.Response) RequestError {
+       defer response.Body.Close()
        responseData, _ := io.ReadAll(response.Body)
        putError := RequestError{
                StatusCode: response.StatusCode,
@@ -101,3 +108,70 @@ func getRequestError(response *http.Response) RequestError {
        }
        return putError
 }
+
+func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate, error) {
+       if cert, err := tls.LoadX509KeyPair(certPath, keyPath); err == nil {
+               return cert, nil
+       } else {
+               return tls.Certificate{}, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", certPath, keyPath, err)
+       }
+}
+
+func CreateRetryClient(cert tls.Certificate) *http.Client {
+       rawRetryClient := retryablehttp.NewClient()
+       rawRetryClient.Logger = leveledLogger{}
+       rawRetryClient.RetryWaitMax = time.Minute
+       rawRetryClient.RetryMax = math.MaxInt
+       rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
+
+       client := rawRetryClient.StandardClient()
+       return client
+}
+
+func CreateClientWithoutRetry(cert tls.Certificate, timeout time.Duration) *http.Client {
+       return &http.Client{
+               Timeout:   timeout,
+               Transport: getSecureTransportWithoutVerify(cert),
+       }
+}
+
+func getSecureTransportWithoutVerify(cert tls.Certificate) *http.Transport {
+       return &http.Transport{
+               TLSClientConfig: &tls.Config{
+                       Certificates: []tls.Certificate{
+                               cert,
+                       },
+                       InsecureSkipVerify: true,
+               },
+       }
+}
+
+func IsUrlSecure(configUrl string) bool {
+       u, _ := url.Parse(configUrl)
+       return u.Scheme == "https"
+}
+
+// Used to get leveled logging in the RetryClient
+type leveledLogger struct {
+}
+
+func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Error(msg)
+}
+func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Info(msg)
+}
+func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Debug(msg)
+}
+func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
+       log.WithFields(getFields(keysAndValues)).Warn(msg)
+}
+
+func getFields(keysAndValues []interface{}) log.Fields {
+       fields := log.Fields{}
+       for i := 0; i < len(keysAndValues); i = i + 2 {
+               fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1]
+       }
+       return fields
+}