import org.junit.jupiter.api.Test;
import org.oran.pmproducer.clients.AsyncRestClient;
import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.configuration.WebClientConfig;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
import org.oran.pmproducer.datastore.DataStore;
import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.r1.ConsumerJobInfo;
import org.oran.pmproducer.repository.InfoType;
import org.oran.pmproducer.repository.InfoTypes;
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
"app.pm-files-path=./src/test/resources/", //
"app.s3.locksBucket=ropfilelocks", //
- "app.pm-files-path=/tmp/dmaapadaptor", //
- "app.s3.bucket=dmaaptest" //
-}) //
+ "app.pm-files-path=/tmp/pmproducer", //
+ "app.s3.bucket=pmproducertest", //
+ "app.auth-token-file=src/test/resources/jwtToken.b64", //
+ "app.kafka.use-oath-token=false"}) //
+/**
+ * Tests interwork with Kafka and Minio
+ * Requires that Kafka and ICS is started.
+ */
class IntegrationWithKafka {
final static String PM_TYPE_ID = "PmDataOverKafka";
return thisProcessUrl();
}
- @Override
- public String getDmaapBaseUrl() {
- return thisProcessUrl();
- }
-
@Override
public String getSelfUrl() {
return thisProcessUrl();
if (logger.isDebugEnabled()) {
logger.debug("*** received data on topic: {}", OUTPUT_TOPIC);
logger.debug("*** received typeId: {}", receivedKafkaOutput.getTypeIdFromHeaders());
+ logger.debug("*** received sourceName: {}", receivedKafkaOutput.getSourceNameFromHeaders());
}
}
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+ // Security
+ this.applicationConfig.addKafkaSecurityProps(props);
+
return SenderOptions.create(props);
}
String msgString = kafkaReceiver.receivedKafkaOutput.valueAsString();
assertThat(msgString).contains("pmCounterNumber0");
assertThat(msgString).doesNotContain("pmCounterNumber1");
+ assertThat(kafkaReceiver.receivedKafkaOutput.getTypeIdFromHeaders()).isEqualTo(PM_TYPE_ID);
+ assertThat(kafkaReceiver.receivedKafkaOutput.getSourceNameFromHeaders()).isEqualTo("HTTPST2-0"); // This is from
+ // the file
printCharacteristicsResult("kafkaCharacteristics_pmFilter_s3", startTime, NO_OF_OBJECTS);
logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
@Test
void testHistoricalData() throws Exception {
- // test
+ // test that it works to get already fetched data
final String JOB_ID = "testHistoricalData";
DataStore fileStore = dataStore();