/*-
* ============LICENSE_START======================================================================
- * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2023 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* ============LICENSE_END========================================================================
*/
-package org.onap.dcaegen2.collectors.datafile.tasks;
+package org.oran.datafile.tasks;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
-import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.oran.datafile.commons.Scheme;
+import org.oran.datafile.configuration.AppConfig;
+import org.oran.datafile.configuration.CertificateConfig;
+import org.oran.datafile.datastore.DataStore;
+import org.oran.datafile.datastore.DataStore.Bucket;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.oran.datafile.http.HttpsClientConnectionManagerUtil;
+import org.oran.datafile.model.Counters;
+import org.oran.datafile.model.FileData;
+import org.oran.datafile.model.FilePublishInformation;
+import org.oran.datafile.model.FileReadyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
.flatMap(this::filterNotFetched, false, 1, 1) //
.flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
+ .flatMap(data -> reportFetchedFile(data, appConfig.getCollectedFileTopic()), false, 1) //
.sequential() //
.doOnError(t -> logger.error("Received error: {}", t.toString())); //
}
}
private Path locaFilePath(FilePublishInformation info) {
- return Paths.get(this.appConfig.collectedFilesPath, info.getName());
+ return Paths.get(appConfig.getCollectedFilesPath(), info.getName());
}
private void deleteLocalFile(FilePublishInformation info) {
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ this.appConfig.addKafkaSecurityProps(props);
return SenderOptions.create(props);
}
* polling/blocking fashion.
*/
private Flux<FileReadyMessage> fetchFromKafka() {
- KafkaTopicListener listener = new KafkaTopicListener(this.appConfig.getKafkaBootStrapServers(),
- this.appConfig.kafkaClientId, this.appConfig.fileReadyEventTopic);
+ KafkaTopicListener listener = new KafkaTopicListener(this.appConfig);
return listener.getFlux() //
.flatMap(this::parseReceivedFileReadyMessage, 1);