github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
+github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
+github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
"sync"
"time"
+ "github.com/hashicorp/go-retryablehttp"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
)
-const timeoutHTTPClient = time.Second * 5
-const timeoutPollClient = time.Second * 15
+const timeoutDistributionClient = time.Second * 5
+const retryWaitMax = time.Minute
+const retryMax = int(^uint(0) >> 1)
var configuration *config.Config
-var httpClient restclient.HTTPClient
+var retryClient restclient.HTTPClient
var jobHandler *jobs.JobHandlerImpl
func init() {
}
callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
- httpClient = &http.Client{
- Timeout: timeoutHTTPClient,
+ distributionClient := &http.Client{
+ Timeout: timeoutDistributionClient,
}
- pollClient := &http.Client{
- Timeout: timeoutPollClient,
- }
- jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient)
+
+ rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.RetryWaitMax = retryWaitMax
+ rawRetryClient.RetryMax = retryMax
+ retryClient = rawRetryClient.StandardClient()
+
+ jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, distributionClient)
if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
}
func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
- registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient)
+ registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
if types, err := jobHandler.GetTypes(); err == nil {
if regErr := registrator.RegisterTypes(types); regErr != nil {
return fmt.Errorf("unable to register all types due to: %v", regErr)
JobResultUri string `json:"job_result_uri"`
InfoTypeId string `json:"info_type_id"`
JobDefinition string `json:"job_definition"`
- }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"}
+ }{
+ JobOwner: fmt.Sprintf("test%v", port),
+ JobResultUri: fmt.Sprintf("http://localhost:%v/jobs", port),
+ InfoTypeId: "STD_Fault_Messages",
+ JobDefinition: "{}",
+ }
fmt.Print("Registering consumer: ", jobInfo)
body, _ := json.Marshal(jobInfo)
putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)