*.zip
*.tar.gz
*.rar
+*.tgz
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
@Component
public class PropertiesHelper {
private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+ private static String kafkaServers = null;
public static Properties getProperties() throws Exception {
- Properties props = null;
+ Properties props = new Properties();
try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
- props = new Properties();
if (input == null) {
- log.error("Found no configuration file in resources");
- throw new Exception("Sorry, unable to find config.properties");
+ log.error("Failed to load configuration file 'config.properties'");
+ throw new IOException("Configuration file 'config.properties' not found");
}
props.load(input);
- String kafkaServers = System.getenv("KAFKA_SERVERS");
- if (kafkaServers != null) {
+ setBootstrapServers(props);
+ } catch (IOException e) {
+ log.error("Error reading configuration file: ", e);
+ throw e;
+ }
+ return props;
+ }
+
+ private static void setBootstrapServers(Properties props) {
+ if (kafkaServers != null && !kafkaServers.isEmpty()) {
+ props.setProperty("bootstrap.servers", kafkaServers);
+ log.info("Using actively bootstrap servers: {}", kafkaServers);
+ } else {
+ String kafkaServersEnv = System.getenv("KAFKA_SERVERS");
+ if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) {
+ kafkaServers = kafkaServersEnv;
props.setProperty("bootstrap.servers", kafkaServers);
- log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+ log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers);
} else {
- log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+ log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file");
}
- } catch (IOException e) {
- log.error("Error reading configuration file: ", e.getMessage());
}
- return props;
+ }
+
+ public static void setKafkaServers(String servers) {
+ kafkaServers = servers;
}
}
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import com.demo.producer.messages.PropertiesHelper;
import com.demo.producer.repository.Job.Parameters;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public void addJob(String id, InfoType type, String owner, Parameters parameters) {
Job job = new Job(id, type, owner, parameters);
+ setKafkaServersEnvironment(job);
this.put(job);
}
+ private void setKafkaServersEnvironment(Job job) {
+ String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers();
+ if (kafkaServers != null && !kafkaServers.isEmpty()) {
+ PropertiesHelper.setKafkaServers(kafkaServers);
+ logger.info("Setting variable bootStrapServers: {}", kafkaServers);
+ } else {
+ logger.warn("bootStrapServers is not set for job: {}", job.getId());
+ }
+ }
+
private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
#
vars:
time: 1000
- autostart: true
+ autostart: false
topic: mytopic #This topic is used only in autostart
spring:
echo "Kafka container is up and running. Starting producer and consumer..."
space
-#Using the autostart flag in the application.yaml
-echo "Start 1 Producer on mytopic"
-#curl -X GET http://localhost:8080/startProducer/mytopic
-space
-
-echo "Start 1 Consumer on mytopic"
-#curl -X GET http://localhost:8081/startConsumer/mytopic
-space
-
-sleep 10
-
echo "Sending type1 to ICS"
curl -X 'PUT' \
'http://localhost:8083/data-producer/v1/info-types/type1' \
"$schema":"http://json-schema.org/draft-07/schema#",
"title":"STD_Type1_1.0.0",
"description":"Type 1",
- "type":"object"
- }
+ "topic": "mytopic",
+ "bootStrapServers": "http://kafka-zkless:9092",
+ "numberOfMessages": 0
+ }
}'
echo "Getting types from ICS"
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
- "status_result_uri": "http://kafka-consumer:8081/info-type-status",
- "owner": "owner"
+ "status_result_uri": "http://kafka-consumer:8081/consumer/info-type-status",
+ "owner": "demo"
}'
echo "Getting Consumer Subscription Job infos from ICS"
curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
space
-sleep 5
+#TEST To set kafka broker in the consumer
+curl -X 'POST' \
+ 'http://localhost:8081/consumer/job/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_type_id": "type1",
+ "job_owner": "demo",
+ "job_definition": {
+ "deliveryInfo": {
+ "topic": "mytopic",
+ "bootStrapServers": "http://kafka-zkless:9092",
+ "numberOfMessages": 0
+ }
+ },
+ "job_result_uri": "http://kafka-producer:8080/producer/job",
+ "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+#Using the autostart flag in the application.yaml
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+#curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
echo "ICS Producer Docker logs "
docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
space