Added regexp filtering of dmaap messages.
Fixed problem in kafka listening.
Fixed problems in integration tests.
Added a test for creating a kafka job, just to improve coverage.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Id969a94dbcd2f52d6c3487f03c7e22b9c6582580
"description": "OK",
"content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
},
+ "400": {
+ "description": "Other error in the request",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}}
+ },
"404": {
"description": "Information type is not found",
"content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}}
application/json:
schema:
$ref: '#/components/schemas/void'
+ 400:
+ description: Other error in the request
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/error_information'
404:
description: Information type is not found
content:
/**
* Generic reactive REST client.
*/
+@SuppressWarnings("java:S4449") // @Add Nullable to third party api
public class AsyncRestClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
- MediaType mediaType) {
+ @Nullable MediaType mediaType) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
import java.util.ArrayList;
import java.util.Collection;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
content = @Content(schema = @Schema(implementation = VoidResponse.class))), //
@ApiResponse(responseCode = "404", description = "Information type is not found", //
content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))), //
+ @ApiResponse(responseCode = "400", description = "Other error in the request", //
+ content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))) //
})
public ResponseEntity<Object> jobCreatedCallback( //
@RequestBody String body) {
this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
request.lastUpdated, toJobParameters(request.jobData));
return new ResponseEntity<>(HttpStatus.OK);
- } catch (Exception e) {
+ } catch (ServiceException e) {
+ logger.warn("jobCreatedCallback failed: {}", e.getMessage());
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+ } catch (Exception e) {
+ logger.warn("jobCreatedCallback failed: {}", e.getMessage());
+ return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
}
}
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
/**
return new Vector<>(innerMap.values());
}
+ public Set<String> keySet() {
+ return this.map.keySet();
+ }
+
public void clear() {
this.map.clear();
}
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
/**
private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
private final AsyncRestClient dmaapRestClient;
- private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
protected final ApplicationConfig applicationConfig;
protected final InfoType type;
protected final Jobs jobs;
- /** Submits new elements until stopped */
- private static class InfiniteFlux {
- private FluxSink<Integer> sink;
- private int counter = 0;
-
- public synchronized Flux<Integer> start() {
- stop();
- return Flux.create(this::next).doOnRequest(this::onRequest);
- }
-
- public synchronized void stop() {
- if (this.sink != null) {
- this.sink.complete();
- this.sink = null;
- }
- }
-
- void onRequest(long no) {
- logger.debug("InfiniteFlux.onRequest {}", no);
- for (long i = 0; i < no; ++i) {
- sink.next(counter++);
- }
- }
-
- void next(FluxSink<Integer> sink) {
- logger.debug("InfiniteFlux.next");
- this.sink = sink;
- sink.next(counter++);
- }
- }
-
public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
}
public void start() {
- infiniteSubmitter.start() //
+ Flux.range(0, Integer.MAX_VALUE) //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
.flatMap(this::pushDataToConsumers) //
.subscribe(//
null, //
throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+ this::onComplete); //
+ }
+ private void onComplete() {
+ logger.warn("DmaapMessageConsumer completed {}", type.getId());
+ start();
}
private String getDmaapUrl() {
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
+ .filter(job -> job.isFilterMatch(body)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
.flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
- t -> stop(), //
+ this::handleExceptionInStream, //
() -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
}
+ private void handleExceptionInStream(Throwable t) {
+ logger.warn("KafkaMessageConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ stop();
+ }
+
private Mono<String> postToClient(String body) {
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.MultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
@Getter
- private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
+ private final MultiMap<KafkaJobDataConsumer> consumers = new MultiMap<>(); // Key is typeId, jobId
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
}
public synchronized void addJob(Job job) {
- if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ if (job.getType().isKafkaTopicDefined()) {
+ removeJob(job);
logger.debug("Kafka job added {}", job.getId());
KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+ if (consumers.get(job.getType().getId()).isEmpty()) {
+ topicConsumer.start();
+ }
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
subscription.start(topicConsumer.getOutput());
- consumers.put(job.getId(), subscription);
+ consumers.put(job.getType().getId(), job.getId(), subscription);
}
}
public synchronized void removeJob(Job job) {
- KafkaJobDataConsumer d = consumers.remove(job.getId());
+ KafkaJobDataConsumer d = consumers.remove(job.getType().getId(), job.getId());
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
d.stop();
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
public synchronized void restartNonRunningTasks() {
-
- for (KafkaJobDataConsumer consumer : consumers.values()) {
- if (!consumer.isRunning()) {
- restartTopic(consumer);
- }
- }
+ this.consumers.keySet().forEach(typeId -> {
+ this.consumers.get(typeId).forEach(consumer -> {
+ if (!consumer.isRunning()) {
+ restartTopic(consumer);
+ }
+ });
+ });
}
private void restartTopic(KafkaJobDataConsumer consumer) {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.forEach((jobId, consumer) -> {
- if (consumer.getJob().getType().getId().equals(type.getId())) {
- consumer.start(topic.getOutput());
- }
+ this.consumers.get(type.getId()).forEach((consumer) -> {
+ consumer.start(topic.getOutput());
});
}
}
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
this.type = type;
- start();
}
public Many<String> getOutput() {
// An object with no properties
String schemaStr = "{" //
+ "\"type\": \"object\"," //
- + "\"properties\": {}," //
+ + "\"properties\": {" //
+ + " \"filter\": { \"type\": \"string\" }" //
+ + "}," //
+ "\"additionalProperties\": false" //
+ "}"; //
- return jsonObject(schemaStr);
+ return
+
+ jsonObject(schemaStr);
}
+
}
private String readSchemaFile(String filePath) throws IOException, ServiceException {
"type": "integer"
}
},
+ "additionalProperties": false,
"required": [
"maxSize",
"maxTimeMiliseconds"
]
}
},
- "required": []
+ "additionalProperties": false
}
\ No newline at end of file
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.r1.ProducerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
}
private ConsumerJobInfo consumerJobInfo() {
- InfoType type = this.types.getAll().iterator().next();
- return consumerJobInfo(type.getId(), "EI_JOB_ID");
+ return consumerJobInfo("DmaapInformationType", "EI_JOB_ID");
}
private Object jsonObject() {
// Register producer, Register types
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
String jobs = restClient().get(jobUrl).block();
- assertThat(jobs).contains("ExampleInformationType");
+ assertThat(jobs).contains(JOB_ID);
// Delete the job
this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-
}
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Clear the registration, should trigger a re-register
ecsSimulatorController.testResults.reset();
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Just clear the registerred types, should trigger a re-register
ecsSimulatorController.testResults.types.clear();
await().untilAsserted(
- () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1));
+ () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
+ }
+
+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+ // Create a job
+ this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ // Delete the job
+ this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
}
return true;
}
-
}
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
-import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.springframework.beans.factory.annotation.Autowired;
})
class IntegrationWithEcs {
- private static final String EI_JOB_ID = "EI_JOB_ID";
+ private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
+ private static final String DMAAP_TYPE_ID = "DmaapInformationType";
@Autowired
private ApplicationConfig applicationConfig;
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.jobs.clear();
- this.types.clear();
+ assertThat(this.jobs.size()).isZero();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
}
private String jobUrl(String jobId) {
- return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
+ return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true";
}
- private void createInformationJobInEcs(String jobId) {
- String body = gson.toJson(consumerJobInfo());
+ private void createInformationJobInEcs(String typeId, String jobId, String filter) {
+ String body = gson.toJson(consumerJobInfo(typeId, filter));
try {
// Delete the job if it already exists
deleteInformationJobInEcs(jobId);
restClient().delete(jobUrl(jobId)).block();
}
- private ConsumerJobInfo consumerJobInfo() {
- InfoType type = this.types.getAll().iterator().next();
- return consumerJobInfo(type.getId(), EI_JOB_ID);
- }
-
- private Object jsonObject() {
- return jsonObject("{}");
+ private ConsumerJobInfo consumerJobInfo(String typeId, String filter) {
+ return consumerJobInfo(typeId, DMAAP_JOB_ID, filter);
}
private Object jsonObject(String json) {
}
}
- private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId) {
+ private String quote(String str) {
+ return "\"" + str + "\"";
+ }
+
+ private String consumerUri() {
+ return selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ }
+
+ private ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId, String filter) {
try {
- String targetUri = selfBaseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- return new ConsumerJobInfo(typeId, jsonObject(), "owner", targetUri, "");
+
+ String jsonStr = "{ \"filter\" :" + quote(filter) + "}";
+ return new ConsumerJobInfo(typeId, jsonObject(jsonStr), "owner", consumerUri(), "");
} catch (Exception e) {
return null;
}
}
+ @Test
+ void testCreateKafkaJob() {
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
+
+ ConsumerJobInfo jobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+ String body = gson.toJson(jobInfo);
+
+ restClient().putForEntity(jobUrl("KAFKA_JOB_ID"), body).block();
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ deleteInformationJobInEcs("KAFKA_JOB_ID");
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
@Test
void testWholeChain() throws Exception {
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
- createInformationJobInEcs(EI_JOB_ID);
+ createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
DmaapSimulatorController.dmaapResponses.add("DmaapResponse1");
DmaapSimulatorController.dmaapResponses.add("DmaapResponse2");
+ DmaapSimulatorController.dmaapResponses.add("Junk");
ConsumerController.TestResults results = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2));
assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
- deleteInformationJobInEcs(EI_JOB_ID);
+ deleteInformationJobInEcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
- "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"//
+ "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
})
class IntegrationWithKafka {
+ final String TYPE_ID = "KafkaInformationType";
+
@Autowired
private ApplicationConfig applicationConfig;
@Autowired
private KafkaTopicConsumers kafkaTopicConsumers;
- private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+ private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
}
- private Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) {
+ private static Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize,
+ int maxConcurrency) {
Job.Parameters param =
new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
String str = gson.toJson(param);
return jsonObject(str);
}
- private Object jsonObject(String json) {
+ private static Object jsonObject(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
} catch (Exception e) {
}
}
- private ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
+ ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
try {
- InfoType type = this.types.getAll().iterator().next();
- String typeId = type.getId();
String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- return new ConsumerJobInfo(typeId,
+ return new ConsumerJobInfo(TYPE_ID,
jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri,
"");
} catch (Exception e) {
return SenderOptions.create(props);
}
- private SenderRecord<Integer, String, Integer> senderRecord(String data, int i) {
- final InfoType infoType = this.types.getAll().iterator().next();
- return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i);
+ private SenderRecord<Integer, String, Integer> senderRecord(String data) {
+ final InfoType infoType = this.types.get(TYPE_ID);
+ int key = 1;
+ int correlationMetadata = 2;
+ return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), key, data), correlationMetadata);
}
private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
}
@Test
- void kafkaIntegrationTest() throws InterruptedException {
+ void kafkaIntegrationTest() throws Exception {
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
// Register producer, Register types
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend);
verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
- // Just for testing quoting
- this.consumerController.testResults.reset();
- dataToSend = Flux.just(senderRecord("Message\"_", 1));
- sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
-
// Delete the jobs
this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
}
@Test
// Register producer, Register types
await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs.
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+ restClient());
this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+ var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend); // this should overflow
- KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().get(TYPE_ID).iterator().next();
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
- kafkaTopicConsumers.restartNonRunningTasks();
this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ kafkaTopicConsumers.restartNonRunningTasks();
Thread.sleep(1000); // Restarting the input seems to take some asynch time
- dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
+ dataToSend = Flux.just(senderRecord("Howdy\""));
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Howdy_1");
+ verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
+
+ // Delete the jobs
+ this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
}
}
{
"types": [
{
- "id": "ExampleInformationType",
+ "id": "DmaapInformationType",
"dmaapTopicUrl": "/dmaap-topic-1",
"useHttpProxy": false
+ },
+ {
+ "id": "KafkaInformationType",
+ "kafkaInputTopic": "TutorialTopic",
+ "useHttpProxy": false
}
]
}
\ No newline at end of file
+++ /dev/null
-{
- "types": [
- {
- "id": "ExampleInformationType",
- "kafkaInputTopic": "TutorialTopic",
- "useHttpProxy": false
- }
- ]
-}
\ No newline at end of file