[
{
"id": "STD_Fault_Messages",
- "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages",
+ "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
"useHttpProxy": false
}
]
# configuration from the Consul will override the file.
configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
dmaap-base-url: http://dradmin:dradmin@localhost:2222
- # The url used to adress this component. This is used as a callback url sent to other components.
+ # The url used to adress this component. This is used as a callback url sent to other components.
dmaap-adapter-base-url: https://localhost:8435
+ # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic
+ kafka:
+ bootstrap-servers: localhost:9092
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ <version>1.3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>31.0.1-jre</version>
+ </dependency>
</dependencies>
<build>
<plugins>
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
-import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DmaapMessageConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
}
@Bean
- public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) {
+ public InfoTypes types(@Autowired ApplicationConfig appConfig) {
Collection<InfoType> types = appConfig.getTypes();
-
- // Start a consumer for each type
- for (InfoType type : types) {
- DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs);
- topicConsumer.start();
- }
-
return new InfoTypes(types);
}
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
}
public Mono<String> post(String uri, @Nullable String body) {
return postForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
- .flatMap(this::toBody);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: <empty>", traceTag);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> get(String uri) {
return getForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> delete(String uri) {
return deleteForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
}
}
- private Mono<String> toBody(ResponseEntity<String> entity) {
+ private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
- return Mono.just("");
+ return "";
} else {
- return Mono.just(entity.getBody());
+ return entity.getBody();
}
}
.build();
}
- private Mono<WebClient> getWebClient() {
+ private WebClient getWebClient() {
if (this.webClient == null) {
this.webClient = buildWebClient(baseUrl);
}
- return Mono.just(buildWebClient(baseUrl));
+ return this.webClient;
}
}
@Value("${app.dmaap-base-url}")
private String dmaapBaseUrl;
+ @Getter
+ @Value("${app.kafka.bootstrap-servers:}")
+ private String kafkaBootStrapServers;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
@RequestBody String body) {
try {
ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
-
- logger.info("Job started callback {}", request.id);
- Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
- request.lastUpdated);
- this.jobs.put(job);
+ logger.debug("Job started callback {}", request.id);
+ this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
+ request.lastUpdated, toJobParameters(request.jobData));
return new ResponseEntity<>(HttpStatus.OK);
} catch (Exception e) {
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
}
}
+ private Job.Parameters toJobParameters(Object jobData) {
+ String json = gson.toJson(jobData);
+ return gson.fromJson(json, Job.Parameters.class);
+ }
+
@GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting")
@ApiResponse(responseCode = "200", //
public ResponseEntity<Object> jobDeletedCallback( //
@PathVariable("infoJobId") String infoJobId) {
- logger.info("Job deleted callback {}", infoJobId);
+ logger.debug("Job deleted callback {}", infoJobId);
this.jobs.remove(infoJobId);
return new ResponseEntity<>(HttpStatus.OK);
}
import lombok.Getter;
+import org.springframework.util.StringUtils;
+
public class InfoType {
@Getter
@Getter
private final boolean useHttpProxy;
- public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) {
+ @Getter
+ private final String kafkaInputTopic;
+
+ public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) {
this.id = id;
this.dmaapTopicUrl = dmaapTopicUrl;
this.useHttpProxy = useHttpProxy;
+ this.kafkaInputTopic = kafkaInputTopic;
+ }
+
+ public boolean isKafkaTopicDefined() {
+ return StringUtils.hasLength(kafkaInputTopic);
+ }
+
+ public boolean isDmaapTopicDefined() {
+ return StringUtils.hasLength(dmaapTopicUrl);
}
}
private Map<String, InfoType> allTypes = new HashMap<>();
public InfoTypes(Collection<InfoType> types) {
-
for (InfoType type : types) {
put(type);
}
package org.oran.dmaapadapter.repository;
+import java.time.Duration;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import lombok.Getter;
+import org.immutables.gson.Gson;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+
public class Job {
+ @Gson.TypeAdapters
+ public static class Parameters {
+ @Getter
+ private String filter;
+ @Getter
+ private BufferTimeout bufferTimeout;
+
+ private int maxConcurrency;
+
+ public Parameters() {}
+
+ public Parameters(String filter, BufferTimeout bufferTimeout, int maxConcurrency) {
+ this.filter = filter;
+ this.bufferTimeout = bufferTimeout;
+ this.maxConcurrency = maxConcurrency;
+ }
+
+ public int getMaxConcurrency() {
+ return maxConcurrency == 0 ? 1 : maxConcurrency;
+ }
+ }
+
+ @Gson.TypeAdapters
+ public static class BufferTimeout {
+ public BufferTimeout(int maxSize, long maxTimeMiliseconds) {
+ this.maxSize = maxSize;
+ this.maxTimeMiliseconds = maxTimeMiliseconds;
+ }
+
+ public BufferTimeout() {}
+
+ @Getter
+ private int maxSize;
+
+ private long maxTimeMiliseconds;
+
+ public Duration getMaxTime() {
+ return Duration.ofMillis(maxTimeMiliseconds);
+ }
+ }
+
@Getter
private final String id;
@Getter
private final String owner;
+ @Getter
+ private final Parameters parameters;
+
@Getter
private final String lastUpdated;
- public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) {
+ private final Pattern jobDataFilter;
+
+ @Getter
+ private final AsyncRestClient consumerRestClient;
+
+ public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
+ AsyncRestClient consumerRestClient) {
this.id = id;
this.callbackUrl = callbackUrl;
this.type = type;
this.owner = owner;
this.lastUpdated = lastUpdated;
+ this.parameters = parameters;
+ if (parameters != null && parameters.filter != null) {
+ jobDataFilter = Pattern.compile(parameters.filter);
+ } else {
+ jobDataFilter = null;
+ }
+ this.consumerRestClient = consumerRestClient;
+ }
+
+ public boolean isFilterMatch(String data) {
+ if (jobDataFilter == null) {
+ return true;
+ }
+ Matcher matcher = jobDataFilter.matcher(data);
+ return matcher.find();
+ }
+
+ public boolean isBuffered() {
+ return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0
+ && parameters.bufferTimeout.maxTimeMiliseconds > 0;
}
}
package org.oran.dmaapadapter.repository;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Vector;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.repository.Job.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Jobs {
+ public interface Observer {
+ void onJobbAdded(Job job);
+
+ void onJobRemoved(Job job);
+ }
+
private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
+ private final AsyncRestClientFactory restclientFactory;
+ private final List<Observer> observers = new ArrayList<>();
- public Jobs() {}
+ public Jobs(@Autowired ApplicationConfig applicationConfig) {
+ restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+ }
public synchronized Job getJob(String id) throws ServiceException {
Job job = allJobs.get(id);
return allJobs.get(id);
}
- public synchronized void put(Job job) {
- logger.debug("Put service: {}", job.getId());
+ public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
+ Parameters parameters) {
+ AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
+ ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
+ : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
+ Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+ this.put(job);
+ synchronized (observers) {
+ this.observers.forEach(obs -> obs.onJobbAdded(job));
+ }
+ }
+
+ public void addObserver(Observer obs) {
+ synchronized (observers) {
+ this.observers.add(obs);
+ }
+ }
+
+ private synchronized void put(Job job) {
+ logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
}
return job;
}
- public synchronized void remove(Job job) {
- this.allJobs.remove(job.getId());
- jobsByType.remove(job.getType().getId(), job.getId());
+ public void remove(Job job) {
+ synchronized (this) {
+ this.allJobs.remove(job.getId());
+ jobsByType.remove(job.getType().getId(), job.getId());
+ }
+ synchronized (observers) {
+ this.observers.forEach(obs -> obs.onJobRemoved(job));
+ }
}
public synchronized int size() {
* The class fetches incoming requests from DMAAP and sends them further to the
* consumers that has a job for this InformationType.
*/
-
-public class DmaapMessageConsumer {
+public class DmaapTopicConsumer {
private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
- private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
- private final ApplicationConfig applicationConfig;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
+
private final AsyncRestClient dmaapRestClient;
- private final AsyncRestClient consumerRestClient;
- private final InfoType type;
- private final Jobs jobs;
private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+ protected final ApplicationConfig applicationConfig;
+ protected final InfoType type;
+ protected final Jobs jobs;
/** Submits new elements until stopped */
private static class InfiniteFlux {
}
}
- public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
- this.applicationConfig = applicationConfig;
+ public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
- this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
- : restclientFactory.createRestClientNoHttpProxy("");
+ this.applicationConfig = applicationConfig;
this.type = type;
this.jobs = jobs;
}
public void start() {
infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .flatMap(this::handleReceivedMessage, 5) //
+ .flatMap(this::pushDataToConsumers) //
.subscribe(//
- value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+ null, //
throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
- );
+ () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+
}
private String getDmaapUrl() {
-
return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
}
.flatMap(notUsed -> Mono.empty());
}
- private Mono<String> handleConsumerErrorResponse(Throwable t) {
- logger.warn("error from CONSUMER {}", t.getMessage());
- return Mono.empty();
- }
-
- protected Mono<String> getFromMessageRouter(String topicUrl) {
+ private Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.onErrorResume(this::handleDmaapErrorResponse); //
}
- protected Flux<String> handleReceivedMessage(String body) {
- logger.debug("Received from DMAAP {}", body);
- final int CONCURRENCY = 5;
+ private Mono<String> handleConsumerErrorResponse(Throwable t) {
+ logger.warn("error from CONSUMER {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ protected Flux<String> pushDataToConsumers(String body) {
+ logger.debug("Received data {}", body);
+ final int CONCURRENCY = 50;
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
- .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
- .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+ .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
+ .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
-
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapTopicConsumers {
+
+ DmaapTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
+ // Start a consumer for each type
+ for (InfoType type : types.getAll()) {
+ if (type.isDmaapTopicDefined()) {
+ DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
+ topicConsumer.start();
+ }
+ }
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks.Many;
+
+/**
+ * The class streams data from a multi cast sink and sends the data to the Job
+ * owner via REST calls.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+
+public class KafkaJobDataConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
+ private final Many<String> input;
+ private final Job job;
+ private Disposable subscription;
+ private int errorCounter = 0;
+
+ KafkaJobDataConsumer(Many<String> input, Job job) {
+ this.input = input;
+ this.job = job;
+ }
+
+ public synchronized void start() {
+ stop();
+ this.subscription = getMessagesFromKafka(job) //
+ .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
+ .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+ .onErrorResume(this::handleError) //
+ .subscribe(this::handleConsumerSentOk, //
+ this::handleErrorInStream, //
+ () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
+ job.getType().getId()));
+ }
+
+ public synchronized void stop() {
+ if (this.subscription != null) {
+ subscription.dispose();
+ subscription = null;
+ }
+ }
+
+ public synchronized boolean isRunning() {
+ return this.subscription != null;
+ }
+
+ private Flux<String> getMessagesFromKafka(Job job) {
+ Flux<String> result = input.asFlux() //
+ .filter(job::isFilterMatch);
+
+ if (job.isBuffered()) {
+ result = result.bufferTimeout( //
+ job.getParameters().getBufferTimeout().getMaxSize(), //
+ job.getParameters().getBufferTimeout().getMaxTime()) //
+ .map(Object::toString);
+ }
+ return result;
+ }
+
+ private Mono<String> handleError(Throwable t) {
+ logger.warn("exception: {} job: {}", t.getMessage(), job);
+
+ final int STOP_AFTER_ERRORS = 5;
+ if (t instanceof WebClientResponseException) {
+ if (++this.errorCounter > STOP_AFTER_ERRORS) {
+ logger.error("Stopping job {}", job);
+ return Mono.error(t);
+ } else {
+ return Mono.empty(); // Discard
+ }
+ } else {
+ // This can happen if there is an overflow.
+ return Mono.empty();
+ }
+ }
+
+ private void handleConsumerSentOk(String data) {
+ this.errorCounter = 0;
+ }
+
+ private void handleErrorInStream(Throwable t) {
+ logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
+ this.subscription = null;
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Getter;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
+@EnableScheduling
+public class KafkaTopicConsumers {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
+
+ private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+ @Getter
+ private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+
+ private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
+
+ public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types,
+ @Autowired Jobs jobs) {
+
+ for (InfoType type : types.getAll()) {
+ if (type.isKafkaTopicDefined()) {
+ KafkaTopicListener topicConsumer = new KafkaTopicListener(appConfig, type);
+ topicListeners.put(type.getId(), topicConsumer);
+ }
+ }
+
+ jobs.addObserver(new Jobs.Observer() {
+ @Override
+ public void onJobbAdded(Job job) {
+ addJob(job);
+ }
+
+ @Override
+ public void onJobRemoved(Job job) {
+ removeJob(job);
+ }
+
+ });
+ }
+
+ public synchronized void addJob(Job job) {
+ if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ logger.debug("Kafka job added {}", job.getId());
+ KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+ KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
+ subscription.start();
+ activeSubscriptions.put(job.getId(), subscription);
+ }
+ }
+
+ public synchronized void removeJob(Job job) {
+ KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+ if (d != null) {
+ logger.debug("Kafka job removed {}", job.getId());
+ d.stop();
+ }
+ }
+
+ @Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
+ public synchronized void restartNonRunningTasks() {
+ for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+ if (!consumer.isRunning()) {
+ consumer.start();
+ }
+ }
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * The class streams incoming requests from a Kafka topic and sends them further
+ * to a multi cast sink, which several other streams can connect to.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaTopicListener {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
+ private final ApplicationConfig applicationConfig;
+ private final InfoType type;
+ private final Many<String> output;
+
+ public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
+ this.applicationConfig = applicationConfig;
+
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+ this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+ this.type = type;
+ startKafkaTopicReceiver();
+ }
+
+ public Many<String> getOutput() {
+ return this.output;
+ }
+
+ private Disposable startKafkaTopicReceiver() {
+ return KafkaReceiver.create(kafkaInputProperties()) //
+ .receive() //
+ .doOnNext(this::onReceivedData) //
+ .subscribe(null, //
+ this::onReceivedError, //
+ () -> logger.warn("KafkaTopicReceiver stopped"));
+ }
+
+ private void onReceivedData(ConsumerRecord<Integer, String> input) {
+ logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
+ output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+
+ private void onReceivedError(Throwable t) {
+ logger.error("KafkaTopicReceiver error: {}", t.getMessage());
+ }
+
+ private ReceiverOptions<Integer, String> kafkaInputProperties() {
+ Map<String, Object> consumerProps = new HashMap<>();
+ if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
+ logger.error("No kafka boostrap server is setup");
+ }
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return ReceiverOptions.<Integer, String>create(consumerProps)
+ .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
+ }
+
+}
package org.oran.dmaapadapter.tasks;
+import com.google.common.io.CharStreams;
import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
import lombok.Getter;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
logger.warn("Registration of producer failed {}", t.getMessage());
}
+ // Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
return jsonObject("{}");
}
- private ProducerInfoTypeInfo typeRegistrationInfo() {
- return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+ private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+ try {
+ return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+ } catch (Exception e) {
+ logger.error("Fatal error {}", e.getMessage());
+ return null;
+ }
+ }
+
+ private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+
+ if (type.isKafkaTopicDefined()) {
+ String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
+ return jsonObject(schemaStrKafka);
+ } else {
+ // An object with no properties
+ String schemaStr = "{" //
+ + "\"type\": \"object\"," //
+ + "\"properties\": {}," //
+ + "\"additionalProperties\": false" //
+ + "}"; //
+
+ return jsonObject(schemaStr);
+ }
}
- private Object jsonSchemaObject() {
- // An object with no properties
- String schemaStr = "{" //
- + "\"type\": \"object\"," //
- + "\"properties\": {}," //
- + "\"additionalProperties\": false" //
- + "}"; //
- return jsonObject(schemaStr);
+ private String readSchemaFile(String filePath) throws IOException, ServiceException {
+ InputStream in = getClass().getResourceAsStream(filePath);
+ logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+ if (in == null) {
+ throw new ServiceException("Could not readfile: " + filePath);
+ }
+ return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}
private Object jsonObject(String json) {
--- /dev/null
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ },
+ "maxConcurrency": {
+ "type": "integer"
+ },
+ "bufferTimeout": {
+ "type": "object",
+ "properties": {
+ "maxSize": {
+ "type": "integer"
+ },
+ "maxTimeMiliseconds": {
+ "type": "integer"
+ }
+ },
+ "required": [
+ "maxSize",
+ "maxTimeMiliseconds"
+ ]
+ }
+ },
+ "required": []
+}
\ No newline at end of file
public TestResults() {}
+ public boolean hasReceived(String str) {
+ for (String received : receivedBodies) {
+ if (received.equals(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void reset() {
receivedBodies.clear();
}
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
-
}
@PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", //
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+@SuppressWarnings("java:S3577") // Rename class
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@TestPropertySource(properties = { //
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.google.gson.JsonParser;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
+import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.r1.ConsumerJobInfo;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+@SuppressWarnings("java:S3577") // Rename class
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@TestPropertySource(properties = { //
+ "server.ssl.key-store=./config/keystore.jks", //
+ "app.webclient.trust-store=./config/truststore.jks", //
+ "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"//
+})
+class IntegrationWithKafka {
+
+ @Autowired
+ private ApplicationConfig applicationConfig;
+
+ @Autowired
+ private Jobs jobs;
+
+ @Autowired
+ private InfoTypes types;
+
+ @Autowired
+ private ConsumerController consumerController;
+
+ @Autowired
+ private EcsSimulatorController ecsSimulatorController;
+
+ @Autowired
+ private KafkaTopicConsumers kafkaTopicConsumers;
+
+ private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
+ private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+ @LocalServerPort
+ int localServerHttpPort;
+
+ static class TestApplicationConfig extends ApplicationConfig {
+ @Override
+ public String getEcsBaseUrl() {
+ return thisProcessUrl();
+ }
+
+ @Override
+ public String getDmaapBaseUrl() {
+ return thisProcessUrl();
+ }
+
+ @Override
+ public String getSelfUrl() {
+ return thisProcessUrl();
+ }
+
+ private String thisProcessUrl() {
+ final String url = "https://localhost:" + getLocalServerHttpPort();
+ return url;
+ }
+ }
+
+ /**
+ * Overrides the BeanFactory.
+ */
+ @TestConfiguration
+ static class TestBeanFactory extends BeanFactory {
+
+ @Override
+ @Bean
+ public ServletWebServerFactory servletContainer() {
+ return new TomcatServletWebServerFactory();
+ }
+
+ @Override
+ @Bean
+ public ApplicationConfig getApplicationConfig() {
+ TestApplicationConfig cfg = new TestApplicationConfig();
+ return cfg;
+ }
+ }
+
+ @AfterEach
+ void reset() {
+ this.consumerController.testResults.reset();
+ this.ecsSimulatorController.testResults.reset();
+ this.jobs.clear();
+ }
+
+ private AsyncRestClient restClient(boolean useTrustValidation) {
+ WebClientConfig config = this.applicationConfig.getWebClientConfig();
+ HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
+ .httpProxyHost("") //
+ .httpProxyPort(0) //
+ .build();
+ config = ImmutableWebClientConfig.builder() //
+ .keyStoreType(config.keyStoreType()) //
+ .keyStorePassword(config.keyStorePassword()) //
+ .keyStore(config.keyStore()) //
+ .keyPassword(config.keyPassword()) //
+ .isTrustStoreUsed(useTrustValidation) //
+ .trustStore(config.trustStore()) //
+ .trustStorePassword(config.trustStorePassword()) //
+ .httpProxyConfig(httpProxyConfig).build();
+
+ AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+ return restClientFactory.createRestClientNoHttpProxy(baseUrl());
+ }
+
+ private AsyncRestClient restClient() {
+ return restClient(false);
+ }
+
+ private String baseUrl() {
+ return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
+ }
+
+ private Object jobParametersAsJsonObject(String filter, long maxTimeMiliseconds, int maxSize, int maxConcurrency) {
+ Job.Parameters param =
+ new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds), maxConcurrency);
+ String str = gson.toJson(param);
+ return jsonObject(str);
+ }
+
+ private Object jsonObject(String json) {
+ try {
+ return JsonParser.parseString(json).getAsJsonObject();
+ } catch (Exception e) {
+ throw new NullPointerException(e.toString());
+ }
+ }
+
+ private ConsumerJobInfo consumerJobInfo(String filter, Duration maxTime, int maxSize, int maxConcurrency) {
+ try {
+ InfoType type = this.types.getAll().iterator().next();
+ String typeId = type.getId();
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ return new ConsumerJobInfo(typeId,
+ jobParametersAsJsonObject(filter, maxTime.toMillis(), maxSize, maxConcurrency), "owner", targetUri,
+ "");
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private SenderOptions<Integer, String> senderOptions() {
+ String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return SenderOptions.create(props);
+ }
+
+ private SenderRecord<Integer, String, Integer> senderRecord(String data, int i) {
+ final InfoType infoType = this.types.getAll().iterator().next();
+ return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i);
+ }
+
+ private void sendDataToStream(Flux<SenderRecord<Integer, String, Integer>> dataToSend) {
+ final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+
+ sender.send(dataToSend) //
+ .doOnError(e -> logger.error("Send failed", e)) //
+ .blockLast();
+
+ }
+
+ private void verifiedReceivedByConsumer(String... strings) {
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(strings.length));
+ for (String s : strings) {
+ assertTrue(consumer.hasReceived(s));
+ }
+ }
+
+ @Test
+ void kafkaIntegrationTest() throws InterruptedException {
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+ // Create two jobs. One buffering and one with a filter
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+ restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+
+ // Delete the jobs
+ this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+ }
+
+ @Test
+ void kafkaIOverflow() throws InterruptedException {
+ // This does not work. After an overflow, the kafka stream does not seem to work
+ //
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+ // Create two jobs.
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID1, restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+ var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+ sendDataToStream(dataToSend); // this will overflow
+
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+ await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
+ this.consumerController.testResults.reset();
+
+ kafkaTopicConsumers.restartNonRunningTasks();
+
+ dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message__1", "Message__1");
+ }
+
+}
{
"id": "ExampleInformationType",
"dmaapTopicUrl": "/dmaap-topic-1",
- "useHttpProxy": true
+ "useHttpProxy": false
}
]
}
\ No newline at end of file
--- /dev/null
+{
+ "types": [
+ {
+ "id": "ExampleInformationType",
+ "kafkaInputTopic": "TutorialTopic",
+ "useHttpProxy": false
+ }
+ ]
+}
\ No newline at end of file
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
}
public Mono<String> post(String uri, @Nullable String body) {
return postForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
- .flatMap(this::toBody);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: <empty>", traceTag);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> get(String uri) {
return getForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> delete(String uri) {
return deleteForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
}
}
- private Mono<String> toBody(ResponseEntity<String> entity) {
+ private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
- return Mono.just("");
+ return "";
} else {
- return Mono.just(entity.getBody());
+ return entity.getBody();
}
}
.build();
}
- private Mono<WebClient> getWebClient() {
+ private WebClient getWebClient() {
if (this.webClient == null) {
this.webClient = buildWebClient(baseUrl);
}
- return Mono.just(buildWebClient(baseUrl));
+ return this.webClient;
}
-
}
return validatePutEiJob(eiJobId, eiJobObject) //
.flatMap(this::startEiJob) //
.doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
- .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
.onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.INTERNAL_SERVER_ERROR)));
}
return this.producerCallbacks.startInfoSubscriptionJob(newEiJob, infoProducers) //
.doOnNext(noOfAcceptingProducers -> this.logger.debug(
"Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) //
- .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob));
+ .map(noOfAcceptingProducers -> newEiJob);
}
private Mono<InfoJob> validatePutEiJob(String eiJobId, A1eEiJobInfo eiJobInfo) {
return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) //
.flatMap(this::startInfoSubscriptionJob) //
.doOnNext(this.infoJobs::put) //
- .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
.onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
}
return this.producerCallbacks.startInfoSubscriptionJob(newInfoJob, infoProducers) //
.doOnNext(noOfAcceptingProducers -> this.logger.debug("Started job {}, number of activated producers: {}",
newInfoJob.getId(), noOfAcceptingProducers)) //
- .flatMap(noOfAcceptingProducers -> Mono.just(newInfoJob));
+ .map(noOfAcceptingProducers -> newInfoJob);
}
private Mono<InfoJob> validatePutInfoJob(String jobId, ConsumerJobInfo jobInfo, boolean performTypeCheck) {
return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
.flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
.collectList() //
- .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+ .map(okResponses -> Integer.valueOf(okResponses.size())); //
}
/**
private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
SubscriptionInfo subscriptionInfo) {
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
- return Mono.just(1) //
- .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+ return notifyFunc.apply(subscriptionInfo) //
.retryWhen(retrySpec) //
.onErrorResume(throwable -> {
logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
})//
.doOnNext(response -> handleRespondingProducer(response, producer))
.flatMap(response -> checkProducerJobs(producer)) //
- .flatMap(responses -> Mono.just(producer));
+ .map(responses -> producer);
}
private Mono<?> checkProducerJobs(InfoProducer producer) {
// Test that subscriptions are removed for a unresponsive consumer
// PUT a subscription with a junk callback
- final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "/JUNK", "owner");
String body = gson.toJson(info);
restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);