Sample consumer to get kafka broker from ICS
[nonrtric.git] / sample-services / ics-producer-consumer / consumer / src / main / java / com / demo / consumer / controllers / ConsumerController.java
index 63cc215..30225c7 100644 (file)
@@ -32,8 +32,9 @@ import org.springframework.web.bind.annotation.RestController;
 import com.demo.consumer.repository.InfoType;
 import com.demo.consumer.repository.InfoTypes;
 import com.demo.consumer.repository.Job.Parameters;
+import com.demo.consumer.repository.Job.Parameters.KafkaDeliveryInfo;
 import com.demo.consumer.dme.ConsumerJobInfo;
-import com.demo.consumer.dme.ConsumerStatusInfo;
+import com.demo.consumer.dme.JobDataSchema;
 import com.demo.consumer.repository.Jobs;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -68,14 +69,22 @@ public class ConsumerController {
             this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner,
                     toJobParameters(request.jobDefinition));
         } catch (Exception e) {
-            log.error("Error adding the job" + infoJobId, e.getMessage());
+            log.error("Error adding the job " + infoJobId + "{}", e.getMessage());
         }
     }
 
     @PostMapping("/info-type-status")
     public void statusChange(@RequestBody String requestBody) {
-        ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class);
-        log.info("Add Status Job Info", request);
+        JobDataSchema request = gson.fromJson(requestBody, JobDataSchema.class);
+        log.debug("Body Received: {}" , requestBody);
+        try {
+            this.jobs.addJob(request.getInfo_type_id(), types.getType(request.getInfo_type_id()), "",
+                new Parameters(new KafkaDeliveryInfo(
+                        request.getJob_data_schema().getTopic(),
+                        request.getJob_data_schema().getBootStrapServers(), 0)));
+        } catch (Exception e) {
+            log.error("Error adding the info type " + request.getInfo_type_id() + "{}", e.getMessage());
+        }
     }
 
     private Parameters toJobParameters(Object jobData) {