Add kafka jobs to DMaaP Mediator Producer
[nonrtric.git] / dmaap-mediator-producer / main.go
index 1aabdda..819ffa9 100644 (file)
@@ -29,6 +29,7 @@ import (
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
@@ -55,9 +56,12 @@ func main() {
        } else {
                log.Fatalf("Stopping producer due to error: %v", err)
        }
+
        retryClient := restclient.CreateRetryClient(cert)
+       kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers}
+       distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second)
 
-       jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
+       jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient)
        go startCallbackServer(jobsManager, callbackAddress)
 
        if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
@@ -78,11 +82,14 @@ func validateConfiguration(configuration *config.Config) error {
        if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
                return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
        }
+       if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" {
+               return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided")
+       }
        return nil
 }
 func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
        registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
-       configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+       configTypes, err := config.GetJobTypesFromConfiguration("configs")
        if err != nil {
                return fmt.Errorf("unable to register all types due to: %v", err)
        }