import com.demo.consumer.repository.InfoType;
import com.demo.consumer.repository.InfoTypes;
import com.demo.consumer.repository.Job.Parameters;
+import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo;
import com.demo.consumer.dme.ConsumerJobInfo;
-import com.demo.consumer.dme.ConsumerStatusInfo;
+import com.demo.consumer.dme.JobDataSchema;
import com.demo.consumer.repository.Jobs;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner,
toJobParameters(request.jobDefinition));
} catch (Exception e) {
- log.error("Error adding the job" + infoJobId, e.getMessage());
+ log.error("Error adding the job " + infoJobId + "{}", e.getMessage());
}
}
@PostMapping("/info-type-status")
public void statusChange(@RequestBody String requestBody) {
- ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class);
- log.info("Add Status Job Info", request);
+ JobDataSchema request = gson.fromJson(requestBody, JobDataSchema.class);
+ log.debug("Body Received: {}" , requestBody);
+ try {
+ this.jobs.addJob(request.getInfo_type_id(), types.getType(request.getInfo_type_id()), "",
+ new Parameters(new KafkaDeliveryInfo(
+ request.getJob_data_schema().getTopic(),
+ request.getJob_data_schema().getBootStrapServers(), 0)));
+ } catch (Exception e) {
+ log.error("Error adding the info type " + request.getInfo_type_id() + "{}", e.getMessage());
+ }
}
private Parameters toJobParameters(Object jobData) {
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.dme;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+
+import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo;
+import com.google.gson.Gson;
+
+@Data
+public class JobDataSchema {
+
+ public enum InfoJobStatusValues {
+ REGISTERED, UNREGISTERED
+ }
+ @SerializedName("info_type_id")
+ private String info_type_id;
+ @SerializedName("job_data_schema")
+ private DataSchema job_data_schema;
+ @SerializedName("status")
+ private InfoJobStatusValues status;
+
+ @Override
+ public String toString() {
+ return new Gson().toJson(this);
+ }
+
+ @Getter
+ @Setter
+ public class DataSchema {
+ private String title;
+ private String description;
+ @SerializedName("topic")
+ private String topic;
+ @SerializedName("bootStrapServers")
+ private String bootStrapServers;
+ }
+}
@Component
public class PropertiesHelper {
private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+ private static String kafkaServers = null;
public static Properties getProperties() throws Exception {
- Properties props = null;
+ Properties props = new Properties();
try (InputStream input = SimpleConsumer.class.getClassLoader().getResourceAsStream("config.properties")) {
- props = new Properties();
if (input == null) {
- log.error("Found no configuration file in resources");
- throw new Exception("Sorry, unable to find config.properties");
+ log.error("Failed to load configuration file 'config.properties'");
+ throw new IOException("Configuration file 'config.properties' not found");
}
props.load(input);
- String kafkaServers = System.getenv("KAFKA_SERVERS");
- if (kafkaServers != null) {
+ setBootstrapServers(props);
+ } catch (IOException e) {
+ log.error("Error reading configuration file: ", e);
+ throw e;
+ }
+ return props;
+ }
+
+ private static void setBootstrapServers(Properties props) {
+ if (kafkaServers != null && !kafkaServers.isEmpty()) {
+ props.setProperty("bootstrap.servers", kafkaServers);
+ log.info("Using actively bootstrap servers: {}", kafkaServers);
+ } else {
+ String kafkaServersEnv = System.getenv("KAFKA_SERVERS");
+ if (kafkaServersEnv != null && !kafkaServersEnv.isEmpty()) {
+ kafkaServers = kafkaServersEnv;
props.setProperty("bootstrap.servers", kafkaServers);
- log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+ log.info("Using environment variable KAFKA_SERVERS: {}", kafkaServers);
} else {
- log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+ log.info("Environment variable KAFKA_SERVERS not found, defaulting to config file");
}
- } catch (IOException e) {
- log.error("Error reading configuration file: ", e.getMessage());
}
- return props;
+ }
+
+ public static void setKafkaServers(String servers) {
+ kafkaServers = servers;
}
}
import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@ToString
public class Job {
+ @AllArgsConstructor
@Builder
public static class Parameters {
+ @AllArgsConstructor
@Builder
@EqualsAndHashCode
public static class KafkaDeliveryInfo {
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import com.demo.consumer.messages.PropertiesHelper;
import com.demo.consumer.repository.Job.Parameters;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public void addJob(String id, InfoType type, String owner, Parameters parameters) {
Job job = new Job(id, type, owner, parameters);
+ setKafkaServersEnvironment(job);
this.put(job);
}
+ private void setKafkaServersEnvironment(Job job) {
+ String kafkaServers = job.getParameters().getDeliveryInfo().getBootStrapServers();
+ if (kafkaServers != null && !kafkaServers.isEmpty()) {
+ PropertiesHelper.setKafkaServers(kafkaServers);
+ logger.info("Setting variable bootStrapServers: {}", kafkaServers);
+ } else {
+ logger.warn("bootStrapServers is not set for job: {}", job.getId());
+ }
+ }
+
private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
vars:
time: 1000
- autostart: true
+ autostart: false
topic: mytopic #This topic is used only in autostart
spring:
"title":"STD_Type1_1.0.0",
"description":"Type 1",
"topic": "mytopic",
- "bootStrapServers": "http://kafka-zkless:9092",
- "numberOfMessages": 0
+ "bootStrapServers": "kafka-zkless:9092"
}
}'
"job_definition": {
"deliveryInfo": {
"topic": "mytopic",
- "bootStrapServers": "http://kafka-zkless:9092",
- "numberOfMessages": 0
+ "bootStrapServers": "kafka-zkless:9092",
+ "numberOfMessages": 100
}
},
"job_result_uri": "http://kafka-producer:8080/producer/job",
curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
space
-#TEST To set kafka broker in the consumer
-curl -X 'POST' \
- 'http://localhost:8081/consumer/job/1' \
+#To Set Kafka Broker in Consumer
+echo "Sending type1 to ICS to use the callback"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-producer/v1/info-types/type1' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
- "info_type_id": "type1",
- "job_owner": "demo",
- "job_definition": {
- "deliveryInfo": {
- "topic": "mytopic",
- "bootStrapServers": "http://kafka-zkless:9092",
- "numberOfMessages": 0
+ "info_job_data_schema": {
+ "$schema":"http://json-schema.org/draft-07/schema#",
+ "title":"STD_Type1_1.0.0",
+ "description":"Type 1",
+ "topic": "mytopic",
+ "bootStrapServers": "kafka-zkless:9092"
}
- },
- "job_result_uri": "http://kafka-producer:8080/producer/job",
- "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
}'
#Using the autostart flag in the application.yaml
space
echo "Start 1 Consumer on mytopic"
-#curl -X GET http://localhost:8081/startConsumer/mytopic
+curl -X GET http://localhost:8081/startConsumer/mytopic
space
sleep 10