Use retry when calling DMaaP and ECS 36/6936/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 26 Oct 2021 11:22:55 +0000 (13:22 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 26 Oct 2021 11:23:08 +0000 (13:23 +0200)
Issue-ID: NONRTRIC-617
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I43b0a3e8d7ebb439a22154b0246af5b05342af27

dmaap-mediator-producer/go.mod
dmaap-mediator-producer/go.sum
dmaap-mediator-producer/main.go
dmaap-mediator-producer/simulator/consumersimulator.go

index a47c542..ffea6a2 100644 (file)
@@ -10,6 +10,8 @@ require (
 require (
        github.com/davecgh/go-spew v1.1.1 // indirect
        github.com/gorilla/mux v1.8.0 // indirect
+       github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
+       github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
        github.com/stretchr/objx v0.1.0 // indirect
        golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
index e43ad87..8447fa0 100644 (file)
@@ -3,6 +3,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
 github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
+github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
+github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
index beeb995..1a91af4 100644 (file)
@@ -26,6 +26,7 @@ import (
        "sync"
        "time"
 
+       "github.com/hashicorp/go-retryablehttp"
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
@@ -33,11 +34,12 @@ import (
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
-const timeoutHTTPClient = time.Second * 5
-const timeoutPollClient = time.Second * 15
+const timeoutDistributionClient = time.Second * 5
+const retryWaitMax = time.Minute
+const retryMax = int(^uint(0) >> 1)
 
 var configuration *config.Config
-var httpClient restclient.HTTPClient
+var retryClient restclient.HTTPClient
 var jobHandler *jobs.JobHandlerImpl
 
 func init() {
@@ -52,13 +54,16 @@ func main() {
        }
        callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
 
-       httpClient = &http.Client{
-               Timeout: timeoutHTTPClient,
+       distributionClient := &http.Client{
+               Timeout: timeoutDistributionClient,
        }
-       pollClient := &http.Client{
-               Timeout: timeoutPollClient,
-       }
-       jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient)
+
+       rawRetryClient := retryablehttp.NewClient()
+       rawRetryClient.RetryWaitMax = retryWaitMax
+       rawRetryClient.RetryMax = retryMax
+       retryClient = rawRetryClient.StandardClient()
+
+       jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient)
        if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
                log.Fatalf("Stopping producer due to: %v", err)
        }
@@ -94,7 +99,7 @@ func validateConfiguration(configuration *config.Config) error {
 }
 
 func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
-       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient)
+       registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
        if types, err := jobHandler.GetTypes(); err == nil {
                if regErr := registrator.RegisterTypes(types); regErr != nil {
                        return fmt.Errorf("unable to register all types due to: %v", regErr)
index 03da6f4..df81385 100644 (file)
@@ -53,7 +53,12 @@ func registerJob(port int) {
                JobResultUri  string `json:"job_result_uri"`
                InfoTypeId    string `json:"info_type_id"`
                JobDefinition string `json:"job_definition"`
-       }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"}
+       }{
+               JobOwner:      fmt.Sprintf("test%v", port),
+               JobResultUri:  fmt.Sprintf("http://localhost:%v/jobs", port),
+               InfoTypeId:    "STD_Fault_Messages",
+               JobDefinition: "{}",
+       }
        fmt.Print("Registering consumer: ", jobInfo)
        body, _ := json.Marshal(jobInfo)
        putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)