From: elinuxhenrik Date: Tue, 26 Oct 2021 11:22:55 +0000 (+0200) Subject: Use retry when calling DMaaP and ECS X-Git-Tag: 1.2.0~61^2~2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F36%2F6936%2F1;p=nonrtric.git Use retry when calling DMaaP and ECS Issue-ID: NONRTRIC-617 Signed-off-by: elinuxhenrik Change-Id: I43b0a3e8d7ebb439a22154b0246af5b05342af27 --- diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod index a47c5425..ffea6a25 100644 --- a/dmaap-mediator-producer/go.mod +++ b/dmaap-mediator-producer/go.mod @@ -10,6 +10,8 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gorilla/mux v1.8.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.1 // indirect + github.com/hashicorp/go-retryablehttp v0.7.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.1.0 // indirect golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum index e43ad877..8447fa07 100644 --- a/dmaap-mediator-producer/go.sum +++ b/dmaap-mediator-producer/go.sum @@ -3,6 +3,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= +github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4= +github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index beeb995d..1a91af40 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/hashicorp/go-retryablehttp" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" @@ -33,11 +34,12 @@ import ( "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) -const timeoutHTTPClient = time.Second * 5 -const timeoutPollClient = time.Second * 15 +const timeoutDistributionClient = time.Second * 5 +const retryWaitMax = time.Minute +const retryMax = int(^uint(0) >> 1) var configuration *config.Config -var httpClient restclient.HTTPClient +var retryClient restclient.HTTPClient var jobHandler *jobs.JobHandlerImpl func init() { @@ -52,13 +54,16 @@ func main() { } callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort) - httpClient = &http.Client{ - Timeout: timeoutHTTPClient, + distributionClient := &http.Client{ + Timeout: timeoutDistributionClient, } - pollClient := &http.Client{ - Timeout: timeoutPollClient, - } - jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient) + + rawRetryClient := retryablehttp.NewClient() + rawRetryClient.RetryWaitMax = retryWaitMax + rawRetryClient.RetryMax = retryMax + retryClient = rawRetryClient.StandardClient() + + jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient) if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil { log.Fatalf("Stopping producer due to: %v", err) } @@ -94,7 +99,7 @@ func validateConfiguration(configuration *config.Config) error { } func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error { - registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient) + registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient) if types, err := jobHandler.GetTypes(); err == nil { if regErr := registrator.RegisterTypes(types); regErr != nil { return fmt.Errorf("unable to register all types due to: %v", regErr) diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go index 03da6f4e..df813857 100644 --- a/dmaap-mediator-producer/simulator/consumersimulator.go +++ b/dmaap-mediator-producer/simulator/consumersimulator.go @@ -53,7 +53,12 @@ func registerJob(port int) { JobResultUri string `json:"job_result_uri"` InfoTypeId string `json:"info_type_id"` JobDefinition string `json:"job_definition"` - }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"} + }{ + JobOwner: fmt.Sprintf("test%v", port), + JobResultUri: fmt.Sprintf("http://localhost:%v/jobs", port), + InfoTypeId: "STD_Fault_Messages", + JobDefinition: "{}", + } fmt.Print("Registering consumer: ", jobInfo) body, _ := json.Marshal(jobInfo) putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)