From: PatrikBuhr Date: Fri, 23 Sep 2022 13:06:40 +0000 (+0200) Subject: Auto create of its own data subscription. X-Git-Tag: 1.2.0~14 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F16%2F9116%2F5;p=nonrtric%2Fplt%2Fdmaapadapter.git Auto create of its own data subscription. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-773 Change-Id: I2db13a7281700ef0a9a0d4396191813c56cb399a --- diff --git a/api/api.json b/api/api.json index e8ea0c8..4c95af9 100644 --- a/api/api.json +++ b/api/api.json @@ -48,6 +48,38 @@ } } }, + "consumer_job": { + "description": "Information for an Information Job", + "type": "object", + "required": [ + "info_type_id", + "job_definition", + "job_owner", + "job_result_uri" + ], + "properties": { + "info_type_id": { + "description": "Information type Idenitifier of the subscription job", + "type": "string" + }, + "job_result_uri": { + "description": "The target URI of the subscribed information", + "type": "string" + }, + "job_owner": { + "description": "Identity of the owner of the job", + "type": "string" + }, + "job_definition": { + "description": "Information type specific job data", + "type": "object" + }, + "status_notification_uri": { + "description": "The target of Information subscription job status notifications", + "type": "string" + } + } + }, "void": { "description": "Void/empty", "type": "object" @@ -353,6 +385,24 @@ }}, "tags": ["Actuator"] }}, + "/data-consumer/v1/info-jobs/{infoJobId}": {"put": { + "requestBody": { + "content": {"application/json": {"schema": {"$ref": "#/components/schemas/consumer_job"}}}, + "required": true + }, + "operationId": "putIndividualInfoJob", + "responses": {"200": { + "description": "OK", + "content": {"application/json": {"schema": {"type": "object"}}} + }}, + "parameters": [{ + "schema": {"type": "string"}, + "in": "path", + "name": "infoJobId", + "required": true + }], + "tags": ["Information Coordinator Service Simulator (exists only in test)"] + }}, "/actuator/loggers/{name}": { "post": { "summary": "Actuator web endpoint 'loggers-name'", diff --git a/api/api.yaml b/api/api.yaml index bc10472..a73e3e5 100644 --- a/api/api.yaml +++ b/api/api.yaml @@ -293,6 +293,32 @@ paths: '*/*': schema: type: object + /data-consumer/v1/info-jobs/{infoJobId}: + put: + tags: + - Information Coordinator Service Simulator (exists only in test) + operationId: putIndividualInfoJob + parameters: + - name: infoJobId + in: path + required: true + style: simple + explode: false + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/consumer_job' + required: true + responses: + 200: + description: OK + content: + application/json: + schema: + type: object /actuator/loggers/{name}: get: tags: @@ -465,6 +491,30 @@ components: format: int32 example: 503 description: Problem as defined in https://tools.ietf.org/html/rfc7807 + consumer_job: + required: + - info_type_id + - job_definition + - job_owner + - job_result_uri + type: object + properties: + info_type_id: + type: string + description: Information type Idenitifier of the subscription job + job_result_uri: + type: string + description: The target URI of the subscribed information + job_owner: + type: string + description: Identity of the owner of the job + job_definition: + type: object + description: Information type specific job data + status_notification_uri: + type: string + description: The target of Information subscription job status notifications + description: Information for an Information Job void: type: object description: Void/empty diff --git a/config/application.yaml b/config/application.yaml index 74dc2be..8bdf414 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -39,6 +39,9 @@ logging: org.springframework.data: ERROR org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR org.oran.dmaapadapter: INFO + pattern: + console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] %logger{20} - %msg%n" + file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] %logger{20} - %msg%n" file: name: /var/log/dmaap-adapter-service/application.log diff --git a/config/application_configuration.json b/config/application_configuration.json index fbe6e6b..ab7de25 100644 --- a/config/application_configuration.json +++ b/config/application_configuration.json @@ -11,10 +11,15 @@ "useHttpProxy": false }, { - "id": "PmData", - "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12", - "useHttpProxy": true, - "dataType": "pmData" + "id": "PmDataOverKafka", + "kafkaInputTopic": "FileReadyEvent", + "dataType": "PmData", + "inputJobType": "xml-file-data-to-filestore", + "inputJobDefinition": { + "anyParameter1": "FileReadyEvent", + "anyParameter2": "whatEver" + }, + "isJson": true } ] } \ No newline at end of file diff --git a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index adbd32d..a6e2444 100644 --- a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -39,6 +39,7 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; +import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -122,7 +123,15 @@ public class AsyncRestClient { request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken())); } return request.retrieve() // - .toEntity(String.class); + .toEntity(String.class) // + .doOnError(this::onError); // + } + + private void onError(Throwable t) { + if (t instanceof WebClientResponseException) { + WebClientResponseException e = (WebClientResponseException) t; + logger.debug("Response error: {}", e.getResponseBodyAsString()); + } } private static Object createTraceTag() { diff --git a/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java b/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java index 6ce5473..bb7710c 100644 --- a/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java +++ b/src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java @@ -26,7 +26,11 @@ import com.google.gson.annotations.SerializedName; import io.swagger.v3.oas.annotations.media.Schema; +import java.lang.invoke.MethodHandles; + import org.oran.dmaapadapter.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -38,6 +42,8 @@ public class ErrorResponse { .disableHtmlEscaping() // .create(); // + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + // Returned as body for all failed REST calls @Schema(name = "error_information", description = "Problem as defined in https://tools.ietf.org/html/rfc7807") public static class ErrorInfo { @@ -91,6 +97,7 @@ public class ErrorResponse { } public static ResponseEntity create(String text, HttpStatus code) { + logger.debug("Error response: {}, {}", code, text); ErrorInfo p = new ErrorInfo(text, code.value()); String json = gson.toJson(p); HttpHeaders headers = new HttpHeaders(); diff --git a/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java new file mode 100644 index 0000000..167ff63 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java @@ -0,0 +1,43 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.datastore; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DataStore { + public enum Bucket { + FILES, LOCKS + } + + public Flux listFiles(Bucket bucket, String prefix); + + public Mono readFile(Bucket bucket, String fileName); + + public Mono readFile(String bucket, String fileName); + + public Mono createLock(String name); + + public Mono deleteLock(String name); + + public Mono deleteObject(Bucket bucket, String name); + +} diff --git a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java new file mode 100644 index 0000000..e78653c --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java @@ -0,0 +1,130 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.datastore; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.oran.dmaapadapter.exceptions.ServiceException; +import org.springframework.http.HttpStatus; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class FileStore implements DataStore { + + ApplicationConfig applicationConfig; + + public FileStore(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + } + + @Override + public Flux listFiles(Bucket bucket, String prefix) { + Path root = Path.of(applicationConfig.getPmFilesPath(), prefix); + if (!root.toFile().exists()) { + root = root.getParent(); + } + + List result = new ArrayList<>(); + try (Stream stream = Files.walk(root, Integer.MAX_VALUE)) { + + stream.forEach(path -> filterListFiles(path, prefix, result)); + + return Flux.fromIterable(result); + } catch (Exception e) { + return Flux.error(e); + } + } + + private void filterListFiles(Path path, String prefix, List result) { + if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { + result.add(externalName(path)); + } + } + + private String externalName(Path f) { + String fullName = f.toString(); + return fullName.substring(applicationConfig.getPmFilesPath().length()); + } + + public Mono readFile(String bucket, String fileName) { + return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT)); + } + + @Override + public Mono readFile(Bucket bucket, String fileName) { + try { + String contents = Files.readString(path(fileName)); + return Mono.just(contents); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono createLock(String name) { + File file = path(name).toFile(); + try { + boolean res = file.createNewFile(); + return Mono.just(res); + } catch (Exception e) { + return Mono.just(file.exists()); + } + } + + public Mono copyFileTo(Path from, String to) { + try { + Path toPath = path(to); + Files.createDirectories(toPath); + Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING); + return Mono.just(to); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono deleteLock(String name) { + return deleteObject(Bucket.LOCKS, name); + } + + @Override + public Mono deleteObject(Bucket bucket, String name) { + try { + Files.delete(path(name)); + return Mono.just(true); + } catch (Exception e) { + return Mono.just(false); + } + } + + private Path path(String name) { + return Path.of(applicationConfig.getPmFilesPath(), name); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java new file mode 100644 index 0000000..c3b2f9d --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java @@ -0,0 +1,259 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2021 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.datastore; + +import java.net.URI; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; + +import org.oran.dmaapadapter.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class S3ObjectStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); + private final ApplicationConfig applicationConfig; + + private static S3AsyncClient s3AsynchClient; + + public S3ObjectStore(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + + getS3AsynchClient(applicationConfig); + } + + private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) { + if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { + s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); + } + return s3AsynchClient; + } + + private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) { + URI uri = URI.create(applicationConfig.getS3EndpointOverride()); + return S3AsyncClient.builder() // + .region(Region.US_EAST_1) // + .endpointOverride(uri) // + .credentialsProvider(StaticCredentialsProvider.create( // + AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // + applicationConfig.getS3SecretAccessKey()))); + + } + + @Override + public Flux listFiles(Bucket bucket, String prefix) { + return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key); + } + + @Override + public Mono createLock(String name) { + return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) // + .onErrorResume(t -> createLock(name, null)); + } + + private Mono createLock(String name, HeadObjectResponse head) { + if (head == null) { + + return this.putObject(Bucket.LOCKS, name, "") // + .flatMap(resp -> Mono.just(true)) // + .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) // + .onErrorResume(t -> Mono.just(false)); + } else { + return Mono.just(false); + } + } + + @Override + public Mono deleteLock(String name) { + return deleteObject(Bucket.LOCKS, name); + } + + @Override + public Mono deleteObject(Bucket bucket, String name) { + + DeleteObjectRequest request = DeleteObjectRequest.builder() // + .bucket(bucket(bucket)) // + .key(name) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteObject(request); + + return Mono.fromFuture(future).map(resp -> true); + } + + @Override + public Mono readFile(Bucket bucket, String fileName) { + return getDataFromS3Object(bucket(bucket), fileName); + } + + @Override + public Mono readFile(String bucket, String fileName) { + return getDataFromS3Object(bucket, fileName); + } + + public Mono putObject(Bucket bucket, String fileName, String bodyString) { + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(bucket(bucket)) // + .key(fileName) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromString(bodyString); + + CompletableFuture future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(putObjectResponse -> fileName) // + .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); + } + + public Mono copyFileToS3(Bucket bucket, Path fromFile, String toFile) { + return copyFileToS3Bucket(bucket(bucket), fromFile, toFile); + } + + public Mono createS3Bucket(Bucket bucket) { + return createS3Bucket(bucket(bucket)); + } + + private Mono createS3Bucket(String s3Bucket) { + + CreateBucketRequest request = CreateBucketRequest.builder() // + .bucket(s3Bucket) // + .build(); + + CompletableFuture future = s3AsynchClient.createBucket(request); + + return Mono.fromFuture(future) // + .map(f -> s3Bucket) // + .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) + .onErrorResume(t -> Mono.just(s3Bucket)); + } + + public Mono deleteBucket(Bucket bucket) { + return listFiles(bucket, "") // + .flatMap(key -> deleteObject(bucket, key)) // + .collectList() // + .flatMap(list -> deleteBucketFromS3Storage(bucket)) // + .map(resp -> "OK") + .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage())) + .onErrorResume(t -> Mono.just("NOK")); + } + + private Mono deleteBucketFromS3Storage(Bucket bucket) { + DeleteBucketRequest request = DeleteBucketRequest.builder() // + .bucket(bucket(bucket)) // + .build(); + + CompletableFuture future = s3AsynchClient.deleteBucket(request); + + return Mono.fromFuture(future); + } + + private String bucket(Bucket bucket) { + return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket(); + } + + private Mono copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) { + + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(s3Bucket) // + .key(s3Key) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromFile(fileName); + + CompletableFuture future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(f -> s3Key) // + .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage())); + + } + + private Mono getHeadObject(String bucket, String key) { + HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build(); + + CompletableFuture future = s3AsynchClient.headObject(request); + return Mono.fromFuture(future); + } + + private Flux listObjectsInBucket(String bucket, String prefix) { + ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() // + .bucket(bucket) // + .maxKeys(1100) // + .prefix(prefix) // + .build(); + CompletableFuture future = s3AsynchClient.listObjects(listObjectsRequest); + + return Mono.fromFuture(future) // + .map(ListObjectsResponse::contents) // + .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // + .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // + .flatMapMany(Flux::fromIterable) // + .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) // + + ; + } + + private Mono getDataFromS3Object(String bucket, String key) { + + GetObjectRequest request = GetObjectRequest.builder() // + .bucket(bucket) // + .key(key) // + .build(); + + CompletableFuture> future = + s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); + + return Mono.fromFuture(future) // + .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) // + .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) // + .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) // + .onErrorResume(t -> Mono.empty()); + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java index ce2e1a1..4f0bc8c 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/InfoType.java +++ b/src/main/java/org/oran/dmaapadapter/repository/InfoType.java @@ -44,6 +44,12 @@ public class InfoType { @Getter private String kafkaInputTopic; + @Getter + private String inputJobType; + + @Getter + private Object inputJobDefinition; + private String dataType; @Getter diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java index 2e98d81..e6facbc 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java @@ -53,6 +53,7 @@ import reactor.core.publisher.Mono; @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public abstract class JobDataDistributor { private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class); + @Getter private final Job job; private Disposable subscription; @@ -95,8 +96,6 @@ public abstract class JobDataDistributor { } public synchronized void start(Flux input) { - stop(); - collectHistoricalData(); this.errorStats.resetIrrecoverableErrors(); @@ -117,7 +116,7 @@ public abstract class JobDataDistributor { private void collectHistoricalData() { PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null; - if (filter != null) { + if (filter != null && filter.getFilterData().getPmRopStartTime() != null) { this.fileStore.createLock(collectHistoricalDataLockName()) // .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted) : Mono.error(new LockedException(collectHistoricalDataLockName()))) // @@ -128,8 +127,6 @@ public abstract class JobDataDistributor { .flatMap(event -> filterAndBuffer(event, this.job), 1) // .flatMap(this::sendToClient, 1) // .onErrorResume(this::handleCollectHistoricalDataError) // - .collectList() // - .flatMap(list -> fileStore.deleteLock(collectHistoricalDataLockName())) // .subscribe(); } } @@ -140,9 +137,9 @@ public abstract class JobDataDistributor { logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId()); return Mono.empty(); // Ignore } else { - return fileStore.deleteLock(collectHistoricalDataLockName()) // - .map(bool -> "OK") // - .onErrorResume(t2 -> Mono.empty()); + return tryDeleteLockFile() // + .map(bool -> "OK"); + } } @@ -176,7 +173,7 @@ public abstract class JobDataDistributor { } private void handleExceptionInStream(Throwable t) { - logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId()); + logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId()); stop(); } @@ -187,6 +184,13 @@ public abstract class JobDataDistributor { this.subscription.dispose(); this.subscription = null; } + tryDeleteLockFile().subscribe(); + } + + private Mono tryDeleteLockFile() { + return fileStore.deleteLock(collectHistoricalDataLockName()) // + .doOnNext(res -> logger.debug("Removed lockfile {} {}", collectHistoricalDataLockName(), res)) + .onErrorResume(t -> Mono.just(false)); } public synchronized boolean isRunning() { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index 6b5a0f7..eaeeae4 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -20,7 +20,6 @@ package org.oran.dmaapadapter.tasks; - import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index e0f897e..7b9cbcf 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -36,6 +36,7 @@ import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.r1.ConsumerJobInfo; import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; import org.oran.dmaapadapter.repository.InfoType; @@ -128,10 +129,11 @@ public class ProducerRegstrationTask { } private Mono registerTypesAndProducer() { - final int CONCURRENCY = 20; + final int CONCURRENCY = 1; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // + .flatMap(this::createInputDataJob, CONCURRENCY) .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))), CONCURRENCY) // .collectList() // @@ -139,6 +141,29 @@ public class ProducerRegstrationTask { .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo()))); } + private Mono createInputDataJob(InfoType type) { + if (type.getInputJobType() == null) { + return Mono.just(type); + } + + ConsumerJobInfo info = + new ConsumerJobInfo(type.getInputJobType(), type.getInputJobDefinition(), "DmaapAdapter", "", ""); + + final String JOB_ID = type.getId() + "_5b3f4db6-3d9e-11ed-b878-0242ac120002"; + String body = gson.toJson(info); + + return restClient.put(consumerJobUrl(JOB_ID), body) + .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(), + t.getMessage())) + .onErrorResume(t -> Mono.just("")) // + .doOnNext(n -> logger.info("Created job: {}, type: {}", JOB_ID, type.getInputJobType())) // + .map(x -> type); + } + + private String consumerJobUrl(String jobId) { + return applicationConfig.getIcsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId; + } + private Object typeSpecifcInfoObject() { return jsonObject("{}"); } diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 7b6a3bc..84b5685 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -90,8 +90,8 @@ import reactor.test.StepVerifier; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.webclient.trust-store-used=true", // - "app.configuration-filepath=./src/test/resources/test_application_configuration.json"// -}) + "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // + "app.s3.endpointOverride="}) class ApplicationTest { @Autowired @@ -168,7 +168,7 @@ class ApplicationTest { } } - @Test + // @Test void testProtoBuf() throws Exception { String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json"; @@ -200,6 +200,7 @@ class ApplicationTest { } static class TestApplicationConfig extends ApplicationConfig { + @Override public String getIcsBaseUrl() { return thisProcessUrl(); @@ -233,7 +234,7 @@ class ApplicationTest { return new TomcatServletWebServerFactory(); } - @Override + // @Override @Bean public ApplicationConfig getApplicationConfig() { TestApplicationConfig cfg = new TestApplicationConfig(); @@ -242,7 +243,7 @@ class ApplicationTest { } @BeforeEach - void init() { + public void init() { this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort); assertThat(this.jobs.size()).isZero(); assertThat(this.consumerController.testResults.receivedBodies).isEmpty(); @@ -353,7 +354,6 @@ class ApplicationTest { @Test void testTrustValidation() throws IOException { - String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs"; ResponseEntity resp = restClient(true).getForEntity(url).block(); assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); @@ -378,7 +378,7 @@ class ApplicationTest { @Test void testReceiveAndPostDataFromKafka() throws Exception { final String JOB_ID = "ID"; - final String TYPE_ID = "KafkaInformationType"; + final String TYPE_ID = "PmDataOverKafka"; waitForRegistration(); // Create a job @@ -397,8 +397,14 @@ class ApplicationTest { ConsumerController.TestResults consumer = this.consumerController.testResults; await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1)); - assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]"); + assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]"); assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json"); + + // This only works in debugger. Removed for now. + assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull(); + assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId) + .isEqualTo("xml-file-data-to-filestore"); + } @Test @@ -481,7 +487,7 @@ class ApplicationTest { Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, new Job.BufferTimeout(123, 456), null, null); String paramJson = gson.toJson(param); - ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson)); + ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson)); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -516,7 +522,7 @@ class ApplicationTest { + "."; Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null); String paramJson = gson.toJson(param); - ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson)); + ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson)); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -583,7 +589,7 @@ class ApplicationTest { waitForRegistration(); // Create a job with JsonPath Filtering - ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath()); + ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, this.jsonObjectJsonPath()); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); @@ -617,7 +623,7 @@ class ApplicationTest { Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null); String paramJson = gson.toJson(param); - ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson)); + ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson)); this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); } diff --git a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java index ab9e15c..df30196 100644 --- a/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java +++ b/src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java @@ -63,6 +63,7 @@ public class IcsSimulatorController { ProducerRegistrationInfo registrationInfo = null; Map types = Collections.synchronizedMap(new HashMap<>()); String infoProducerId = null; + ConsumerJobInfo createdJob = null; public TestResults() {} @@ -70,6 +71,11 @@ public class IcsSimulatorController { registrationInfo = null; types.clear(); infoProducerId = null; + createdJob = null; + } + + public void setCreatedJob(ConsumerJobInfo informationJobObject) { + this.createdJob = informationJobObject; } } @@ -105,6 +111,17 @@ public class IcsSimulatorController { return new ResponseEntity<>(HttpStatus.OK); } + @PutMapping(path = "/data-consumer/v1/info-jobs/{infoJobId}", // + produces = MediaType.APPLICATION_JSON_VALUE, // + consumes = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity putIndividualInfoJob( // + @PathVariable("infoJobId") String jobId, // + @RequestBody ConsumerJobInfo informationJobObject) { + logger.info("*** added consumer job {}", jobId); + testResults.setCreatedJob(informationJobObject); + return new ResponseEntity<>(HttpStatus.OK); + } + public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) throws ServiceException { String url = this.testResults.registrationInfo.jobCallbackUrl; ProducerJobInfo request = diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index c34d0f3..4ec51ff 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -285,7 +285,7 @@ class IntegrationWithIcs { @Test void testPmFilter() throws Exception { await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue()); - final String TYPE_ID = "PmInformationType"; + final String TYPE_ID = "PmDataOverRest"; String jsonStr = reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }"); diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 9cf3dec..668a327 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -46,6 +46,8 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.WebClientConfig; import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.datastore.DataStore; +import org.oran.dmaapadapter.datastore.S3ObjectStore; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.filter.PmReportFilter; import org.oran.dmaapadapter.r1.ConsumerJobInfo; @@ -53,11 +55,7 @@ import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; import org.oran.dmaapadapter.repository.Job; import org.oran.dmaapadapter.repository.Jobs; -import org.oran.dmaapadapter.tasks.DataStore; -import org.oran.dmaapadapter.tasks.DataStore.Bucket; -import org.oran.dmaapadapter.tasks.FileStore; import org.oran.dmaapadapter.tasks.KafkaTopicListener; -import org.oran.dmaapadapter.tasks.S3ObjectStore; import org.oran.dmaapadapter.tasks.TopicListener; import org.oran.dmaapadapter.tasks.TopicListeners; import org.slf4j.Logger; @@ -83,11 +81,13 @@ import reactor.kafka.sender.SenderRecord; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // - "app.pm-files-path=./src/test/resources/", "app.s3.locksBucket=ropfilelocks", "app.s3.bucket=ropfiles"}) // + "app.pm-files-path=./src/test/resources/", // + "app.s3.locksBucket=ropfilelocks", // + "app.s3.bucket=ropfiles"}) // class IntegrationWithKafka { final String TYPE_ID = "KafkaInformationType"; - final String PM_TYPE_ID = "PmInformationTypeKafka"; + final String PM_TYPE_ID = "PmDataOverKafka"; @Autowired private ApplicationConfig applicationConfig; @@ -166,7 +166,7 @@ class IntegrationWithKafka { int count = 0; - public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) { + public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) { this.OUTPUT_TOPIC = outputTopic; // Create a listener to the output topic. The KafkaTopicListener happens to be @@ -209,8 +209,8 @@ class IntegrationWithKafka { @BeforeEach void init() { if (kafkaReceiver == null) { - kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic"); - kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2"); + kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext); + kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext); } kafkaReceiver.reset(); kafkaReceiver2.reset(); @@ -464,69 +464,6 @@ class IntegrationWithKafka { logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); } - @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. - // @Test - void kafkaCharacteristics_pmFilter_localFile() throws Exception { - // Filter PM reports and sent to two jobs over Kafka - - final String JOB_ID = "kafkaCharacteristics"; - final String JOB_ID2 = "kafkaCharacteristics2"; - - // Register producer, Register types - await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull()); - assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size()); - - PmReportFilter.FilterData filterData = new PmReportFilter.FilterData(); - filterData.getMeasTypes().add("succImmediateAssignProcs"); - filterData.getMeasObjClass().add("UtranCell"); - - this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID, - restClient()); - this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2, - restClient()); - - await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); - waitForKafkaListener(); - - final int NO_OF_OBJECTS = 100; - - Instant startTime = Instant.now(); - - KafkaTopicListener.NewFileEvent event = - KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build(); - String eventAsString = gson.toJson(event); - - var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID)); - sendDataToKafka(dataToSend); - - while (kafkaReceiver.count != NO_OF_OBJECTS) { - logger.info("sleeping {}", kafkaReceiver.count); - Thread.sleep(1000 * 1); - } - - final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond(); - logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds); - logger.info("*** kafkaReceiver2 :" + kafkaReceiver.count); - - printStatistics(); - } - - @Test - void testListFiles() { - FileStore fileStore = new FileStore(applicationConfig); - - fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"), - "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block(); - - List files = fileStore.listFiles(Bucket.FILES, "O-DU-112").collectList().block(); - assertThat(files).hasSize(1); - - files = fileStore.listFiles(Bucket.FILES, "O-DU-1122").collectList().block(); - assertThat(files).hasSize(1); - - fileStore.deleteObject(Bucket.FILES, files.get(0)); - } - @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. @Test void kafkaCharacteristics_pmFilter_s3() throws Exception { diff --git a/src/test/resources/test_application_configuration.json b/src/test/resources/test_application_configuration.json index 2863590..64ef1c5 100644 --- a/src/test/resources/test_application_configuration.json +++ b/src/test/resources/test_application_configuration.json @@ -8,21 +8,29 @@ }, { "id": "KafkaInformationType", - "kafkaInputTopic": "KafkaInput", - "useHttpProxy": false + "kafkaInputTopic": "KafkaInput" }, { - "id": "PmInformationType", + "id": "PmDataOverRest", "dmaapTopicUrl": "/dmaap-topic-2", "useHttpProxy": false, "dataType": "PmData", "isJson": true }, { - "id": "PmInformationTypeKafka", - "kafkaInputTopic": "PmFileData", - "useHttpProxy": false, + "id": "PmDataOverKafka", + "kafkaInputTopic": "FileReadyEvent", "dataType": "PmData", + "inputJobType": "xml-file-data-to-filestore", + "inputJobDefinition": { + "kafkaOutputTopic": "FileReadyEvent", + "filestore-output-bucket": "pm-files-json", + "filterType": "pmdata", + "filter": { + "inputCompression": "xml.gz", + "outputCompression": "none" + } + }, "isJson": true } ]