import org.oran.dmaapadapter.repository.Jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
* The class fetches incoming requests from DMAAP and sends them further to the
* consumers that has a job for this InformationType.
*/
-
public class DmaapTopicConsumer {
private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
- .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
+ .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
}