}
}
},
+ "consumer_job": {
+ "description": "Information for an Information Job",
+ "type": "object",
+ "required": [
+ "info_type_id",
+ "job_definition",
+ "job_owner",
+ "job_result_uri"
+ ],
+ "properties": {
+ "info_type_id": {
+ "description": "Information type Idenitifier of the subscription job",
+ "type": "string"
+ },
+ "job_result_uri": {
+ "description": "The target URI of the subscribed information",
+ "type": "string"
+ },
+ "job_owner": {
+ "description": "Identity of the owner of the job",
+ "type": "string"
+ },
+ "job_definition": {
+ "description": "Information type specific job data",
+ "type": "object"
+ },
+ "status_notification_uri": {
+ "description": "The target of Information subscription job status notifications",
+ "type": "string"
+ }
+ }
+ },
"void": {
"description": "Void/empty",
"type": "object"
}},
"tags": ["Actuator"]
}},
+ "/data-consumer/v1/info-jobs/{infoJobId}": {"put": {
+ "requestBody": {
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/consumer_job"}}},
+ "required": true
+ },
+ "operationId": "putIndividualInfoJob",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoJobId",
+ "required": true
+ }],
+ "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+ }},
"/actuator/loggers/{name}": {
"post": {
"summary": "Actuator web endpoint 'loggers-name'",
'*/*':
schema:
type: object
+ /data-consumer/v1/info-jobs/{infoJobId}:
+ put:
+ tags:
+ - Information Coordinator Service Simulator (exists only in test)
+ operationId: putIndividualInfoJob
+ parameters:
+ - name: infoJobId
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/consumer_job'
+ required: true
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: object
/actuator/loggers/{name}:
get:
tags:
format: int32
example: 503
description: Problem as defined in https://tools.ietf.org/html/rfc7807
+ consumer_job:
+ required:
+ - info_type_id
+ - job_definition
+ - job_owner
+ - job_result_uri
+ type: object
+ properties:
+ info_type_id:
+ type: string
+ description: Information type Idenitifier of the subscription job
+ job_result_uri:
+ type: string
+ description: The target URI of the subscribed information
+ job_owner:
+ type: string
+ description: Identity of the owner of the job
+ job_definition:
+ type: object
+ description: Information type specific job data
+ status_notification_uri:
+ type: string
+ description: The target of Information subscription job status notifications
+ description: Information for an Information Job
void:
type: object
description: Void/empty
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.oran.dmaapadapter: INFO
+ pattern:
+ console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] %logger{20} - %msg%n"
+ file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] %logger{20} - %msg%n"
file:
name: /var/log/dmaap-adapter-service/application.log
"useHttpProxy": false
},
{
- "id": "PmData",
- "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
- "useHttpProxy": true,
- "dataType": "pmData"
+ "id": "PmDataOverKafka",
+ "kafkaInputTopic": "FileReadyEvent",
+ "dataType": "PmData",
+ "inputJobType": "xml-file-data-to-filestore",
+ "inputJobDefinition": {
+ "anyParameter1": "FileReadyEvent",
+ "anyParameter2": "whatEver"
+ },
+ "isJson": true
}
]
}
\ No newline at end of file
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
}
return request.retrieve() //
- .toEntity(String.class);
+ .toEntity(String.class) //
+ .doOnError(this::onError); //
+ }
+
+ private void onError(Throwable t) {
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException e = (WebClientResponseException) t;
+ logger.debug("Response error: {}", e.getResponseBodyAsString());
+ }
}
private static Object createTraceTag() {
import io.swagger.v3.oas.annotations.media.Schema;
+import java.lang.invoke.MethodHandles;
+
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
.disableHtmlEscaping() //
.create(); //
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
// Returned as body for all failed REST calls
@Schema(name = "error_information", description = "Problem as defined in https://tools.ietf.org/html/rfc7807")
public static class ErrorInfo {
}
public static ResponseEntity<Object> create(String text, HttpStatus code) {
+ logger.debug("Error response: {}, {}", code, text);
ErrorInfo p = new ErrorInfo(text, code.value());
String json = gson.toJson(p);
HttpHeaders headers = new HttpHeaders();
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.datastore;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+ public enum Bucket {
+ FILES, LOCKS
+ }
+
+ public Flux<String> listFiles(Bucket bucket, String prefix);
+
+ public Mono<String> readFile(Bucket bucket, String fileName);
+
+ public Mono<String> readFile(String bucket, String fileName);
+
+ public Mono<Boolean> createLock(String name);
+
+ public Mono<Boolean> deleteLock(String name);
+
+ public Mono<Boolean> deleteObject(Bucket bucket, String name);
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.datastore;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.springframework.http.HttpStatus;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class FileStore implements DataStore {
+
+ ApplicationConfig applicationConfig;
+
+ public FileStore(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+ }
+
+ @Override
+ public Flux<String> listFiles(Bucket bucket, String prefix) {
+ Path root = Path.of(applicationConfig.getPmFilesPath(), prefix);
+ if (!root.toFile().exists()) {
+ root = root.getParent();
+ }
+
+ List<String> result = new ArrayList<>();
+ try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+ stream.forEach(path -> filterListFiles(path, prefix, result));
+
+ return Flux.fromIterable(result);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+ }
+
+ private void filterListFiles(Path path, String prefix, List<String> result) {
+ if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+ result.add(externalName(path));
+ }
+ }
+
+ private String externalName(Path f) {
+ String fullName = f.toString();
+ return fullName.substring(applicationConfig.getPmFilesPath().length());
+ }
+
+ public Mono<String> readFile(String bucket, String fileName) {
+ return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
+ }
+
+ @Override
+ public Mono<String> readFile(Bucket bucket, String fileName) {
+ try {
+ String contents = Files.readString(path(fileName));
+ return Mono.just(contents);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> createLock(String name) {
+ File file = path(name).toFile();
+ try {
+ boolean res = file.createNewFile();
+ return Mono.just(res);
+ } catch (Exception e) {
+ return Mono.just(file.exists());
+ }
+ }
+
+ public Mono<String> copyFileTo(Path from, String to) {
+ try {
+ Path toPath = path(to);
+ Files.createDirectories(toPath);
+ Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
+ return Mono.just(to);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteLock(String name) {
+ return deleteObject(Bucket.LOCKS, name);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+ try {
+ Files.delete(path(name));
+ return Mono.just(true);
+ } catch (Exception e) {
+ return Mono.just(false);
+ }
+ }
+
+ private Path path(String name) {
+ return Path.of(applicationConfig.getPmFilesPath(), name);
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.datastore;
+
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3ObjectStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+ private final ApplicationConfig applicationConfig;
+
+ private static S3AsyncClient s3AsynchClient;
+
+ public S3ObjectStore(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+
+ getS3AsynchClient(applicationConfig);
+ }
+
+ private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+ if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+ s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+ }
+ return s3AsynchClient;
+ }
+
+ private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+ URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+ return S3AsyncClient.builder() //
+ .region(Region.US_EAST_1) //
+ .endpointOverride(uri) //
+ .credentialsProvider(StaticCredentialsProvider.create( //
+ AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+ applicationConfig.getS3SecretAccessKey())));
+
+ }
+
+ @Override
+ public Flux<String> listFiles(Bucket bucket, String prefix) {
+ return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
+ }
+
+ @Override
+ public Mono<Boolean> createLock(String name) {
+ return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
+ .onErrorResume(t -> createLock(name, null));
+ }
+
+ private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
+ if (head == null) {
+
+ return this.putObject(Bucket.LOCKS, name, "") //
+ .flatMap(resp -> Mono.just(true)) //
+ .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
+ .onErrorResume(t -> Mono.just(false));
+ } else {
+ return Mono.just(false);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteLock(String name) {
+ return deleteObject(Bucket.LOCKS, name);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+
+ DeleteObjectRequest request = DeleteObjectRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .key(name) //
+ .build();
+
+ CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+ return Mono.fromFuture(future).map(resp -> true);
+ }
+
+ @Override
+ public Mono<String> readFile(Bucket bucket, String fileName) {
+ return getDataFromS3Object(bucket(bucket), fileName);
+ }
+
+ @Override
+ public Mono<String> readFile(String bucket, String fileName) {
+ return getDataFromS3Object(bucket, fileName);
+ }
+
+ public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .key(fileName) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(putObjectResponse -> fileName) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+ }
+
+ public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
+ return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+ }
+
+ public Mono<String> createS3Bucket(Bucket bucket) {
+ return createS3Bucket(bucket(bucket));
+ }
+
+ private Mono<String> createS3Bucket(String s3Bucket) {
+
+ CreateBucketRequest request = CreateBucketRequest.builder() //
+ .bucket(s3Bucket) //
+ .build();
+
+ CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Bucket) //
+ .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+ .onErrorResume(t -> Mono.just(s3Bucket));
+ }
+
+ public Mono<String> deleteBucket(Bucket bucket) {
+ return listFiles(bucket, "") //
+ .flatMap(key -> deleteObject(bucket, key)) //
+ .collectList() //
+ .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
+ .map(resp -> "OK")
+ .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
+ .onErrorResume(t -> Mono.just("NOK"));
+ }
+
+ private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
+ DeleteBucketRequest request = DeleteBucketRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .build();
+
+ CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+ return Mono.fromFuture(future);
+ }
+
+ private String bucket(Bucket bucket) {
+ return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
+ }
+
+ private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
+
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(s3Bucket) //
+ .key(s3Key) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Key) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+
+ }
+
+ private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
+ HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
+
+ CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
+ return Mono.fromFuture(future);
+ }
+
+ private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+ ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
+ .bucket(bucket) //
+ .maxKeys(1100) //
+ .prefix(prefix) //
+ .build();
+ CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+
+ return Mono.fromFuture(future) //
+ .map(ListObjectsResponse::contents) //
+ .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+ .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
+
+ ;
+ }
+
+ private Mono<String> getDataFromS3Object(String bucket, String key) {
+
+ GetObjectRequest request = GetObjectRequest.builder() //
+ .bucket(bucket) //
+ .key(key) //
+ .build();
+
+ CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+ s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+ return Mono.fromFuture(future) //
+ .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
+ .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
+ .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+ .onErrorResume(t -> Mono.empty());
+ }
+
+}
@Getter
private String kafkaInputTopic;
+ @Getter
+ private String inputJobType;
+
+ @Getter
+ private Object inputJobDefinition;
+
private String dataType;
@Getter
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public abstract class JobDataDistributor {
private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
+
@Getter
private final Job job;
private Disposable subscription;
}
public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
- stop();
-
collectHistoricalData();
this.errorStats.resetIrrecoverableErrors();
private void collectHistoricalData() {
PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
- if (filter != null) {
+ if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
this.fileStore.createLock(collectHistoricalDataLockName()) //
.flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted)
: Mono.error(new LockedException(collectHistoricalDataLockName()))) //
.flatMap(event -> filterAndBuffer(event, this.job), 1) //
.flatMap(this::sendToClient, 1) //
.onErrorResume(this::handleCollectHistoricalDataError) //
- .collectList() //
- .flatMap(list -> fileStore.deleteLock(collectHistoricalDataLockName())) //
.subscribe();
}
}
logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
return Mono.empty(); // Ignore
} else {
- return fileStore.deleteLock(collectHistoricalDataLockName()) //
- .map(bool -> "OK") //
- .onErrorResume(t2 -> Mono.empty());
+ return tryDeleteLockFile() //
+ .map(bool -> "OK");
+
}
}
}
private void handleExceptionInStream(Throwable t) {
- logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+ logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
stop();
}
this.subscription.dispose();
this.subscription = null;
}
+ tryDeleteLockFile().subscribe();
+ }
+
+ private Mono<Boolean> tryDeleteLockFile() {
+ return fileStore.deleteLock(collectHistoricalDataLockName()) //
+ .doOnNext(res -> logger.debug("Removed lockfile {} {}", collectHistoricalDataLockName(), res))
+ .onErrorResume(t -> Mono.just(false));
}
public synchronized boolean isRunning() {
package org.oran.dmaapadapter.tasks;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
}
private Mono<String> registerTypesAndProducer() {
- final int CONCURRENCY = 20;
+ final int CONCURRENCY = 1;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
+ .flatMap(this::createInputDataJob, CONCURRENCY)
.flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
CONCURRENCY) //
.collectList() //
.flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
}
+ private Mono<InfoType> createInputDataJob(InfoType type) {
+ if (type.getInputJobType() == null) {
+ return Mono.just(type);
+ }
+
+ ConsumerJobInfo info =
+ new ConsumerJobInfo(type.getInputJobType(), type.getInputJobDefinition(), "DmaapAdapter", "", "");
+
+ final String JOB_ID = type.getId() + "_5b3f4db6-3d9e-11ed-b878-0242ac120002";
+ String body = gson.toJson(info);
+
+ return restClient.put(consumerJobUrl(JOB_ID), body)
+ .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
+ t.getMessage()))
+ .onErrorResume(t -> Mono.just("")) //
+ .doOnNext(n -> logger.info("Created job: {}, type: {}", JOB_ID, type.getInputJobType())) //
+ .map(x -> type);
+ }
+
+ private String consumerJobUrl(String jobId) {
+ return applicationConfig.getIcsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
+ }
+
private Object typeSpecifcInfoObject() {
return jsonObject("{}");
}
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.webclient.trust-store-used=true", //
- "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
-})
+ "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
+ "app.s3.endpointOverride="})
class ApplicationTest {
@Autowired
}
}
- @Test
+ // @Test
void testProtoBuf() throws Exception {
String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json";
}
static class TestApplicationConfig extends ApplicationConfig {
+
@Override
public String getIcsBaseUrl() {
return thisProcessUrl();
return new TomcatServletWebServerFactory();
}
- @Override
+ // @Override
@Bean
public ApplicationConfig getApplicationConfig() {
TestApplicationConfig cfg = new TestApplicationConfig();
}
@BeforeEach
- void init() {
+ public void init() {
this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
assertThat(this.jobs.size()).isZero();
assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
@Test
void testTrustValidation() throws IOException {
-
String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
ResponseEntity<String> resp = restClient(true).getForEntity(url).block();
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
@Test
void testReceiveAndPostDataFromKafka() throws Exception {
final String JOB_ID = "ID";
- final String TYPE_ID = "KafkaInformationType";
+ final String TYPE_ID = "PmDataOverKafka";
waitForRegistration();
// Create a job
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
- assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]");
assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
+
+ // This only works in debugger. Removed for now.
+ assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull();
+ assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId)
+ .isEqualTo("xml-file-data-to-filestore");
+
}
@Test
Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
new Job.BufferTimeout(123, 456), null, null);
String paramJson = gson.toJson(param);
- ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+ ".";
Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null);
String paramJson = gson.toJson(param);
- ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson));
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
waitForRegistration();
// Create a job with JsonPath Filtering
- ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, this.jsonObjectJsonPath());
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null);
String paramJson = gson.toJson(param);
- ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson));
+ ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson));
this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
}
ProducerRegistrationInfo registrationInfo = null;
Map<String, ProducerInfoTypeInfo> types = Collections.synchronizedMap(new HashMap<>());
String infoProducerId = null;
+ ConsumerJobInfo createdJob = null;
public TestResults() {}
registrationInfo = null;
types.clear();
infoProducerId = null;
+ createdJob = null;
+ }
+
+ public void setCreatedJob(ConsumerJobInfo informationJobObject) {
+ this.createdJob = informationJobObject;
}
}
return new ResponseEntity<>(HttpStatus.OK);
}
+ @PutMapping(path = "/data-consumer/v1/info-jobs/{infoJobId}", //
+ produces = MediaType.APPLICATION_JSON_VALUE, //
+ consumes = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<Object> putIndividualInfoJob( //
+ @PathVariable("infoJobId") String jobId, //
+ @RequestBody ConsumerJobInfo informationJobObject) {
+ logger.info("*** added consumer job {}", jobId);
+ testResults.setCreatedJob(informationJobObject);
+ return new ResponseEntity<>(HttpStatus.OK);
+ }
+
public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) throws ServiceException {
String url = this.testResults.registrationInfo.jobCallbackUrl;
ProducerJobInfo request =
@Test
void testPmFilter() throws Exception {
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
- final String TYPE_ID = "PmInformationType";
+ final String TYPE_ID = "PmDataOverRest";
String jsonStr =
reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }");
import org.oran.dmaapadapter.configuration.WebClientConfig;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.S3ObjectStore;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.filter.PmReportFilter;
import org.oran.dmaapadapter.r1.ConsumerJobInfo;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DataStore;
-import org.oran.dmaapadapter.tasks.DataStore.Bucket;
-import org.oran.dmaapadapter.tasks.FileStore;
import org.oran.dmaapadapter.tasks.KafkaTopicListener;
-import org.oran.dmaapadapter.tasks.S3ObjectStore;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
import org.slf4j.Logger;
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.pm-files-path=./src/test/resources/", "app.s3.locksBucket=ropfilelocks", "app.s3.bucket=ropfiles"}) //
+ "app.pm-files-path=./src/test/resources/", //
+ "app.s3.locksBucket=ropfilelocks", //
+ "app.s3.bucket=ropfiles"}) //
class IntegrationWithKafka {
final String TYPE_ID = "KafkaInformationType";
- final String PM_TYPE_ID = "PmInformationTypeKafka";
+ final String PM_TYPE_ID = "PmDataOverKafka";
@Autowired
private ApplicationConfig applicationConfig;
int count = 0;
- public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) {
+ public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) {
this.OUTPUT_TOPIC = outputTopic;
// Create a listener to the output topic. The KafkaTopicListener happens to be
@BeforeEach
void init() {
if (kafkaReceiver == null) {
- kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic");
- kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2");
+ kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext);
+ kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext);
}
kafkaReceiver.reset();
kafkaReceiver2.reset();
logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
}
- @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
- // @Test
- void kafkaCharacteristics_pmFilter_localFile() throws Exception {
- // Filter PM reports and sent to two jobs over Kafka
-
- final String JOB_ID = "kafkaCharacteristics";
- final String JOB_ID2 = "kafkaCharacteristics2";
-
- // Register producer, Register types
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
- PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
- filterData.getMeasTypes().add("succImmediateAssignProcs");
- filterData.getMeasObjClass().add("UtranCell");
-
- this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
- restClient());
- this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
- restClient());
-
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
- waitForKafkaListener();
-
- final int NO_OF_OBJECTS = 100;
-
- Instant startTime = Instant.now();
-
- KafkaTopicListener.NewFileEvent event =
- KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build();
- String eventAsString = gson.toJson(event);
-
- var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
- sendDataToKafka(dataToSend);
-
- while (kafkaReceiver.count != NO_OF_OBJECTS) {
- logger.info("sleeping {}", kafkaReceiver.count);
- Thread.sleep(1000 * 1);
- }
-
- final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
- logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
- logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count);
-
- printStatistics();
- }
-
- @Test
- void testListFiles() {
- FileStore fileStore = new FileStore(applicationConfig);
-
- fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
- "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
-
- List<String> files = fileStore.listFiles(Bucket.FILES, "O-DU-112").collectList().block();
- assertThat(files).hasSize(1);
-
- files = fileStore.listFiles(Bucket.FILES, "O-DU-1122").collectList().block();
- assertThat(files).hasSize(1);
-
- fileStore.deleteObject(Bucket.FILES, files.get(0));
- }
-
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
@Test
void kafkaCharacteristics_pmFilter_s3() throws Exception {
},
{
"id": "KafkaInformationType",
- "kafkaInputTopic": "KafkaInput",
- "useHttpProxy": false
+ "kafkaInputTopic": "KafkaInput"
},
{
- "id": "PmInformationType",
+ "id": "PmDataOverRest",
"dmaapTopicUrl": "/dmaap-topic-2",
"useHttpProxy": false,
"dataType": "PmData",
"isJson": true
},
{
- "id": "PmInformationTypeKafka",
- "kafkaInputTopic": "PmFileData",
- "useHttpProxy": false,
+ "id": "PmDataOverKafka",
+ "kafkaInputTopic": "FileReadyEvent",
"dataType": "PmData",
+ "inputJobType": "xml-file-data-to-filestore",
+ "inputJobDefinition": {
+ "kafkaOutputTopic": "FileReadyEvent",
+ "filestore-output-bucket": "pm-files-json",
+ "filterType": "pmdata",
+ "filter": {
+ "inputCompression": "xml.gz",
+ "outputCompression": "none"
+ }
+ },
"isJson": true
}
]