+ log.Debug("Starting DMaaP Mediator Producer")
+ go func() {
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ r := server.NewRouter(jobsManager)
+ log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r))
+ }()
+
+ keepProducerAlive()
+}
+
+func validateConfiguration(configuration *config.Config) error {
+ if configuration.InfoProducerHost == "" {
+ return fmt.Errorf("missing INFO_PRODUCER_HOST")
+ }
+ if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
+ return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
+ }
+ return nil
+}
+
+func createClientCertificate() (*tls.Certificate, error) {
+ if cert, err := tls.LoadX509KeyPair(configuration.ProducerCertPath, configuration.ProducerKeyPath); err == nil {
+ return &cert, nil
+ } else {
+ return nil, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", configuration.ProducerCertPath, configuration.ProducerKeyPath, err)
+ }
+}
+
+func createRetryClient(cert *tls.Certificate) *http.Client {
+ rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.RetryWaitMax = time.Minute
+ rawRetryClient.RetryMax = int(^uint(0) >> 1)
+ rawRetryClient.HTTPClient.Transport = &http.Transport{
+ TLSClientConfig: &tls.Config{
+ Certificates: []tls.Certificate{
+ *cert,
+ },
+ InsecureSkipVerify: true,
+ },
+ }
+
+ return rawRetryClient.StandardClient()
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
+ registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
+ if types, err := jobHandler.LoadTypesFromConfiguration(); err == nil {