- registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
- if types, err := jobtypes.GetTypes(); err == nil {
+ log.Debug("Starting DMaaP Mediator Producer")
+ wg := new(sync.WaitGroup)
+
+ // add two goroutines to `wg` WaitGroup, one for each running go routine
+ wg.Add(2)
+
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
+ go func() {
+ r := server.NewRouter(jobHandler)
+ log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
+ wg.Done()
+ }()
+
+ go func() {
+ jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
+ wg.Done()
+ }()
+
+ // wait until WaitGroup is done
+ wg.Wait()
+ log.Debug("Stopping DMaaP Mediator Producer")
+}
+
+func validateConfiguration(configuration *config.Config) error {
+ if configuration.InfoProducerHost == "" {
+ return fmt.Errorf("missing INFO_PRODUCER_HOST")
+ }
+ return nil
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
+ registrator := config.NewRegistratorImpl(infoCoordinatorAddress, retryClient)
+ if types, err := jobHandler.GetTypes(); err == nil {