gerrit-change-number: ${{ inputs.GERRIT_CHANGE_NUMBER }}
gerrit-patchset-number: ${{ inputs.GERRIT_PATCHSET_NUMBER }}
vote-type: clear
- comment-only: true
- name: Allow replication
run: sleep 10s
gerrit-change-number: ${{ inputs.GERRIT_CHANGE_NUMBER }}
gerrit-patchset-number: ${{ inputs.GERRIT_PATCHSET_NUMBER }}
vote-type: ${{ env.WORKFLOW_CONCLUSION }}
- comment-only: true
throw new CapifAccessException("Unexpected error");\r
}\r
\r
- //TODO The below should be uncommented once SME Manager provides an accessible URI\r
-\r
-// String helloWorldEndpoint = "";\r
-// List<String> apiSetEndpoints = getApiSetEndpoints(apiResponse, baseUrl);\r
-// if (apiSetEndpoints != null && !apiSetEndpoints.isEmpty()) {\r
-// helloWorldEndpoint = apiSetEndpoints.get(0);\r
-// }\r
-//\r
-// if (helloWorldEndpoint != null && !helloWorldEndpoint.isEmpty()) {\r
-// try {\r
-// String responseHelloWorld = restTemplate.getForObject(helloWorldEndpoint, String.class);\r
-// logger.info("Response :- ", responseHelloWorld);\r
-// } catch (IllegalArgumentException e) {\r
-// throw new CapifAccessException("Error accessing the URL :- " + helloWorldEndpoint);\r
-// } catch (Exception e) {\r
-// throw new CapifAccessException("Unexpected error");\r
-// }\r
-// }\r
+ String helloWorldEndpoint = "";\r
+ List<String> apiSetEndpoints = getApiSetEndpoints(apiResponse, baseUrl);\r
+ if (apiSetEndpoints != null && !apiSetEndpoints.isEmpty()) {\r
+ helloWorldEndpoint = apiSetEndpoints.get(0);\r
+ }\r
+\r
+ if (helloWorldEndpoint != null && !helloWorldEndpoint.isEmpty()) {\r
+ try {\r
+ String responseHelloWorld = restTemplate.getForObject(helloWorldEndpoint, String.class);\r
+ logger.info("rApp SME Provider Response : {}", responseHelloWorld);\r
+ } catch (IllegalArgumentException e) {\r
+ throw new CapifAccessException("Error accessing the URL :- " + helloWorldEndpoint);\r
+ } catch (Exception e) {\r
+ throw new CapifAccessException("Unexpected error");\r
+ }\r
+ }\r
}\r
}\r
\r
*.zip
*.tar.gz
*.rar
+*.tgz
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
// make the consumer available for graceful shutdown
setKafkaConsumer(consumer);
consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
- //consumer.seekToBeginning(consumer.assignment()); //--from-beginning
int recNum = numOfRecs;
while (recNum > 0) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
@RequestMapping(path = "/producer", produces = "application/json")
public class ProducerController {
private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
-
private static Gson gson = new GsonBuilder().create();
-
private final Jobs jobs;
private final InfoTypes types;
private String topicName = "mytopic";
-
public ProducerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
this.jobs = jobs;
this.types = types;
@Component
public class PropertiesHelper {
private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+ private static String kafkaServers = null;
public static Properties getProperties() throws Exception {
- Properties props = null;
+ Properties props = new Properties();
try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
- props = new Properties();
if (input == null) {
- log.error("Found no configuration file in resources");
- throw new Exception("Sorry, unable to find config.properties");
+ log.error("Failed to load configuration file 'config.properties'");
+ throw new IOException("Configuration file 'config.properties' not found");
}
props.load(input);
- String kafkaServers = System.getenv("KAFKA_SERVERS");
- if (kafkaServers != null) {
+ setBootstrapServers(props);
+ } catch (IOException e) {
+ log.error("Error reading configuration file: ", e);
+ throw e;
+ }
+ return props;
+ }
+
+ private static void setBootstrapServers(Properties props) {
+ if (kafkaServers != null && !kafkaServers.isEmpty()) {
+ props.setProperty("bootstrap.servers", kafkaServers);
+ log.info("Using actively bootstrap servers: {}", kafkaServers);
+ } else {
+ String kafkaServersEnv = System.getenv("KAFKA_SERVERS");
+ if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) {
+ kafkaServers = kafkaServersEnv;
props.setProperty("bootstrap.servers", kafkaServers);
- log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+ log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers);
} else {
- log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+ log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file");
}
- } catch (IOException e) {
- log.error("Error reading configuration file: ", e.getMessage());
}
- return props;
+ }
+
+ public static void setKafkaServers(String servers) {
+ kafkaServers = servers;
}
}
@Getter
@Setter
private Object inputJobDefinition;
-
}
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import com.demo.producer.messages.PropertiesHelper;
import com.demo.producer.repository.Job.Parameters;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public void addJob(String id, InfoType type, String owner, Parameters parameters) {
Job job = new Job(id, type, owner, parameters);
+ setKafkaServersEnvironment(job);
this.put(job);
}
+ private void setKafkaServersEnvironment(Job job) {
+ String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers();
+ if (kafkaServers != null && !kafkaServers.isEmpty()) {
+ PropertiesHelper.setKafkaServers(kafkaServers);
+ logger.info("Setting variable bootStrapServers: {}", kafkaServers);
+ } else {
+ logger.warn("bootStrapServers is not set for job: {}", job.getId());
+ }
+ }
+
private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
#
vars:
time: 1000
- autostart: true
+ autostart: false
topic: mytopic #This topic is used only in autostart
spring:
echo "Kafka container is up and running. Starting producer and consumer..."
space
-echo "Start 1 Producer on mytopic"
-curl -X GET http://localhost:8080/startProducer/mytopic
-space
-
-echo "Start 1 Consumer on mytopic"
-curl -X GET http://localhost:8081/startConsumer/mytopic
-space
-
-sleep 10
-
echo "Sending type1 to ICS"
curl -X 'PUT' \
'http://localhost:8083/data-producer/v1/info-types/type1' \
"$schema":"http://json-schema.org/draft-07/schema#",
"title":"STD_Type1_1.0.0",
"description":"Type 1",
- "type":"object"
- }
+ "topic": "mytopic",
+ "bootStrapServers": "http://kafka-zkless:9092",
+ "numberOfMessages": 0
+ }
}'
echo "Getting types from ICS"
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
- "status_result_uri": "http://kafka-consumer:8081/info-type-status",
- "owner": "owner"
+ "status_result_uri": "http://kafka-consumer:8081/consumer/info-type-status",
+ "owner": "demo"
}'
echo "Getting Consumer Subscription Job infos from ICS"
curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
space
-sleep 5
+#TEST To set kafka broker in the consumer
+curl -X 'POST' \
+ 'http://localhost:8081/consumer/job/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_type_id": "type1",
+ "job_owner": "demo",
+ "job_definition": {
+ "deliveryInfo": {
+ "topic": "mytopic",
+ "bootStrapServers": "http://kafka-zkless:9092",
+ "numberOfMessages": 0
+ }
+ },
+ "job_result_uri": "http://kafka-producer:8080/producer/job",
+ "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+#Using the autostart flag in the application.yaml
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+#curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
echo "ICS Producer Docker logs "
docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
space
for container in "${containers[@]}"; do
if docker logs "$container" | grep -q ERROR; then
echo "Errors found in logs of $container"
+ docker logs "$container" | grep ERROR
echo "FAIL"
exit 1
else
fi
}
-# Function to wait for a Docker container to be running and log a specific string
+# Function to wait for a Docker container to be running and log a specific string with a maximum timeout of 20 minutes
wait_for_container() {
local container_name="$1"
local log_string="$2"
+ local timeout=1200 # Timeout set to 20 minutes (20 minutes * 60 seconds)
+
+ local start_time=$(date +%s)
+ local end_time=$((start_time + timeout))
while ! docker inspect "$container_name" &>/dev/null; do
echo "Waiting for container '$container_name' to be created..."
sleep 5
+ if [ "$(date +%s)" -ge "$end_time" ]; then
+ echo "Timeout: Container creation exceeded 20 minutes."
+ exit 1
+ fi
done
while [ "$(docker inspect -f '{{.State.Status}}' "$container_name")" != "running" ]; do
echo "Waiting for container '$container_name' to be running..."
sleep 5
+ if [ "$(date +%s)" -ge "$end_time" ]; then
+ echo "Timeout: Container start exceeded 20 minutes."
+ exit 1
+ fi
done
# Check container logs for the specified string
while ! docker logs "$container_name" 2>&1 | grep "$log_string"; do
echo "Waiting for '$log_string' in container logs of '$container_name'..."
sleep 5
+ if [ "$(date +%s)" -ge "$end_time" ]; then
+ echo "Timeout: Log string not found within 20 minutes."
+ exit 1
+ fi
done
}
+
space() {
echo ""
echo "++++++++++++++++++++++++++++++++++++++++++++++++++++"