Auto create of its own data subscription. 16/9116/5
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 23 Sep 2022 13:06:40 +0000 (15:06 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 29 Sep 2022 11:24:51 +0000 (13:24 +0200)
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I2db13a7281700ef0a9a0d4396191813c56cb399a

18 files changed:
api/api.json
api/api.yaml
config/application.yaml
config/application_configuration.json
src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
src/main/java/org/oran/dmaapadapter/controllers/ErrorResponse.java
src/main/java/org/oran/dmaapadapter/datastore/DataStore.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/datastore/FileStore.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/InfoType.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataDistributor.java
src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/IcsSimulatorController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
src/test/resources/test_application_configuration.json

index e8ea0c8..4c95af9 100644 (file)
                 }
             }
         },
+        "consumer_job": {
+            "description": "Information for an Information Job",
+            "type": "object",
+            "required": [
+                "info_type_id",
+                "job_definition",
+                "job_owner",
+                "job_result_uri"
+            ],
+            "properties": {
+                "info_type_id": {
+                    "description": "Information type Idenitifier of the subscription job",
+                    "type": "string"
+                },
+                "job_result_uri": {
+                    "description": "The target URI of the subscribed information",
+                    "type": "string"
+                },
+                "job_owner": {
+                    "description": "Identity of the owner of the job",
+                    "type": "string"
+                },
+                "job_definition": {
+                    "description": "Information type specific job data",
+                    "type": "object"
+                },
+                "status_notification_uri": {
+                    "description": "The target of Information subscription job status notifications",
+                    "type": "string"
+                }
+            }
+        },
         "void": {
             "description": "Void/empty",
             "type": "object"
             }},
             "tags": ["Actuator"]
         }},
+        "/data-consumer/v1/info-jobs/{infoJobId}": {"put": {
+            "requestBody": {
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/consumer_job"}}},
+                "required": true
+            },
+            "operationId": "putIndividualInfoJob",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"application/json": {"schema": {"type": "object"}}}
+            }},
+            "parameters": [{
+                "schema": {"type": "string"},
+                "in": "path",
+                "name": "infoJobId",
+                "required": true
+            }],
+            "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+        }},
         "/actuator/loggers/{name}": {
             "post": {
                 "summary": "Actuator web endpoint 'loggers-name'",
index bc10472..a73e3e5 100644 (file)
@@ -293,6 +293,32 @@ paths:
             '*/*':
               schema:
                 type: object
+  /data-consumer/v1/info-jobs/{infoJobId}:
+    put:
+      tags:
+      - Information Coordinator Service Simulator (exists only in test)
+      operationId: putIndividualInfoJob
+      parameters:
+      - name: infoJobId
+        in: path
+        required: true
+        style: simple
+        explode: false
+        schema:
+          type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/consumer_job'
+        required: true
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: object
   /actuator/loggers/{name}:
     get:
       tags:
@@ -465,6 +491,30 @@ components:
           format: int32
           example: 503
       description: Problem as defined in https://tools.ietf.org/html/rfc7807
+    consumer_job:
+      required:
+      - info_type_id
+      - job_definition
+      - job_owner
+      - job_result_uri
+      type: object
+      properties:
+        info_type_id:
+          type: string
+          description: Information type Idenitifier of the subscription job
+        job_result_uri:
+          type: string
+          description: The target URI of the subscribed information
+        job_owner:
+          type: string
+          description: Identity of the owner of the job
+        job_definition:
+          type: object
+          description: Information type specific job data
+        status_notification_uri:
+          type: string
+          description: The target of Information subscription job status notifications
+      description: Information for an Information Job
     void:
       type: object
       description: Void/empty
index 74dc2be..8bdf414 100644 (file)
@@ -39,6 +39,9 @@ logging:
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.oran.dmaapadapter: INFO
+  pattern:
+    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level]  %logger{20} - %msg%n"
+    file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level]  %logger{20} - %msg%n"
 
   file:
     name: /var/log/dmaap-adapter-service/application.log
index fbe6e6b..ab7de25 100644 (file)
          "useHttpProxy": false
       },
       {
-         "id": "PmData",
-         "dmaapTopicUrl": "/events/PM_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
-         "useHttpProxy": true,
-         "dataType": "pmData"
+         "id": "PmDataOverKafka",
+         "kafkaInputTopic": "FileReadyEvent",
+         "dataType": "PmData",
+         "inputJobType": "xml-file-data-to-filestore",
+         "inputJobDefinition": {
+            "anyParameter1": "FileReadyEvent",
+            "anyParameter2": "whatEver"
+         },
+         "isJson": true
       }
    ]
 }
\ No newline at end of file
index adbd32d..a6e2444 100644 (file)
@@ -39,6 +39,7 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.ExchangeStrategies;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
@@ -122,7 +123,15 @@ public class AsyncRestClient {
             request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
         }
         return request.retrieve() //
-                .toEntity(String.class);
+                .toEntity(String.class) //
+                .doOnError(this::onError); //
+    }
+
+    private void onError(Throwable t) {
+        if (t instanceof WebClientResponseException) {
+            WebClientResponseException e = (WebClientResponseException) t;
+            logger.debug("Response error: {}", e.getResponseBodyAsString());
+        }
     }
 
     private static Object createTraceTag() {
index 6ce5473..bb7710c 100644 (file)
@@ -26,7 +26,11 @@ import com.google.gson.annotations.SerializedName;
 
 import io.swagger.v3.oas.annotations.media.Schema;
 
+import java.lang.invoke.MethodHandles;
+
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -38,6 +42,8 @@ public class ErrorResponse {
             .disableHtmlEscaping() //
             .create(); //
 
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
     // Returned as body for all failed REST calls
     @Schema(name = "error_information", description = "Problem as defined in https://tools.ietf.org/html/rfc7807")
     public static class ErrorInfo {
@@ -91,6 +97,7 @@ public class ErrorResponse {
     }
 
     public static ResponseEntity<Object> create(String text, HttpStatus code) {
+        logger.debug("Error response: {}, {}", code, text);
         ErrorInfo p = new ErrorInfo(text, code.value());
         String json = gson.toJson(p);
         HttpHeaders headers = new HttpHeaders();
diff --git a/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java b/src/main/java/org/oran/dmaapadapter/datastore/DataStore.java
new file mode 100644 (file)
index 0000000..167ff63
--- /dev/null
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.datastore;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+    public enum Bucket {
+        FILES, LOCKS
+    }
+
+    public Flux<String> listFiles(Bucket bucket, String prefix);
+
+    public Mono<String> readFile(Bucket bucket, String fileName);
+
+    public Mono<String> readFile(String bucket, String fileName);
+
+    public Mono<Boolean> createLock(String name);
+
+    public Mono<Boolean> deleteLock(String name);
+
+    public Mono<Boolean> deleteObject(Bucket bucket, String name);
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java b/src/main/java/org/oran/dmaapadapter/datastore/FileStore.java
new file mode 100644 (file)
index 0000000..e78653c
--- /dev/null
@@ -0,0 +1,130 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.datastore;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.springframework.http.HttpStatus;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class FileStore implements DataStore {
+
+    ApplicationConfig applicationConfig;
+
+    public FileStore(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
+
+    @Override
+    public Flux<String> listFiles(Bucket bucket, String prefix) {
+        Path root = Path.of(applicationConfig.getPmFilesPath(), prefix);
+        if (!root.toFile().exists()) {
+            root = root.getParent();
+        }
+
+        List<String> result = new ArrayList<>();
+        try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+            stream.forEach(path -> filterListFiles(path, prefix, result));
+
+            return Flux.fromIterable(result);
+        } catch (Exception e) {
+            return Flux.error(e);
+        }
+    }
+
+    private void filterListFiles(Path path, String prefix, List<String> result) {
+        if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+            result.add(externalName(path));
+        }
+    }
+
+    private String externalName(Path f) {
+        String fullName = f.toString();
+        return fullName.substring(applicationConfig.getPmFilesPath().length());
+    }
+
+    public Mono<String> readFile(String bucket, String fileName) {
+        return Mono.error(new ServiceException("readFile from bucket Not implemented", HttpStatus.CONFLICT));
+    }
+
+    @Override
+    public Mono<String> readFile(Bucket bucket, String fileName) {
+        try {
+            String contents = Files.readString(path(fileName));
+            return Mono.just(contents);
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+    @Override
+    public Mono<Boolean> createLock(String name) {
+        File file = path(name).toFile();
+        try {
+            boolean res = file.createNewFile();
+            return Mono.just(res);
+        } catch (Exception e) {
+            return Mono.just(file.exists());
+        }
+    }
+
+    public Mono<String> copyFileTo(Path from, String to) {
+        try {
+            Path toPath = path(to);
+            Files.createDirectories(toPath);
+            Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
+            return Mono.just(to);
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+    @Override
+    public Mono<Boolean> deleteLock(String name) {
+        return deleteObject(Bucket.LOCKS, name);
+    }
+
+    @Override
+    public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+        try {
+            Files.delete(path(name));
+            return Mono.just(true);
+        } catch (Exception e) {
+            return Mono.just(false);
+        }
+    }
+
+    private Path path(String name) {
+        return Path.of(applicationConfig.getPmFilesPath(), name);
+    }
+
+}
diff --git a/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java b/src/main/java/org/oran/dmaapadapter/datastore/S3ObjectStore.java
new file mode 100644 (file)
index 0000000..c3b2f9d
--- /dev/null
@@ -0,0 +1,259 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.datastore;
+
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3ObjectStore implements DataStore {
+    private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+    private final ApplicationConfig applicationConfig;
+
+    private static S3AsyncClient s3AsynchClient;
+
+    public S3ObjectStore(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+
+        getS3AsynchClient(applicationConfig);
+    }
+
+    private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+        if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+            s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+        }
+        return s3AsynchClient;
+    }
+
+    private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+        URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+        return S3AsyncClient.builder() //
+                .region(Region.US_EAST_1) //
+                .endpointOverride(uri) //
+                .credentialsProvider(StaticCredentialsProvider.create( //
+                        AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+                                applicationConfig.getS3SecretAccessKey())));
+
+    }
+
+    @Override
+    public Flux<String> listFiles(Bucket bucket, String prefix) {
+        return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
+    }
+
+    @Override
+    public Mono<Boolean> createLock(String name) {
+        return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
+                .onErrorResume(t -> createLock(name, null));
+    }
+
+    private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
+        if (head == null) {
+
+            return this.putObject(Bucket.LOCKS, name, "") //
+                    .flatMap(resp -> Mono.just(true)) //
+                    .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
+                    .onErrorResume(t -> Mono.just(false));
+        } else {
+            return Mono.just(false);
+        }
+    }
+
+    @Override
+    public Mono<Boolean> deleteLock(String name) {
+        return deleteObject(Bucket.LOCKS, name);
+    }
+
+    @Override
+    public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+
+        DeleteObjectRequest request = DeleteObjectRequest.builder() //
+                .bucket(bucket(bucket)) //
+                .key(name) //
+                .build();
+
+        CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+        return Mono.fromFuture(future).map(resp -> true);
+    }
+
+    @Override
+    public Mono<String> readFile(Bucket bucket, String fileName) {
+        return getDataFromS3Object(bucket(bucket), fileName);
+    }
+
+    @Override
+    public Mono<String> readFile(String bucket, String fileName) {
+        return getDataFromS3Object(bucket, fileName);
+    }
+
+    public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
+        PutObjectRequest request = PutObjectRequest.builder() //
+                .bucket(bucket(bucket)) //
+                .key(fileName) //
+                .build();
+
+        AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
+
+        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+        return Mono.fromFuture(future) //
+                .map(putObjectResponse -> fileName) //
+                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+    }
+
+    public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
+        return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+    }
+
+    public Mono<String> createS3Bucket(Bucket bucket) {
+        return createS3Bucket(bucket(bucket));
+    }
+
+    private Mono<String> createS3Bucket(String s3Bucket) {
+
+        CreateBucketRequest request = CreateBucketRequest.builder() //
+                .bucket(s3Bucket) //
+                .build();
+
+        CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+        return Mono.fromFuture(future) //
+                .map(f -> s3Bucket) //
+                .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+                .onErrorResume(t -> Mono.just(s3Bucket));
+    }
+
+    public Mono<String> deleteBucket(Bucket bucket) {
+        return listFiles(bucket, "") //
+                .flatMap(key -> deleteObject(bucket, key)) //
+                .collectList() //
+                .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
+                .map(resp -> "OK")
+                .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
+                .onErrorResume(t -> Mono.just("NOK"));
+    }
+
+    private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
+        DeleteBucketRequest request = DeleteBucketRequest.builder() //
+                .bucket(bucket(bucket)) //
+                .build();
+
+        CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+        return Mono.fromFuture(future);
+    }
+
+    private String bucket(Bucket bucket) {
+        return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
+    }
+
+    private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
+
+        PutObjectRequest request = PutObjectRequest.builder() //
+                .bucket(s3Bucket) //
+                .key(s3Key) //
+                .build();
+
+        AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
+
+        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+        return Mono.fromFuture(future) //
+                .map(f -> s3Key) //
+                .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+
+    }
+
+    private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
+        HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
+
+        CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
+        return Mono.fromFuture(future);
+    }
+
+    private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+        ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() //
+                .bucket(bucket) //
+                .maxKeys(1100) //
+                .prefix(prefix) //
+                .build();
+        CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+
+        return Mono.fromFuture(future) //
+                .map(ListObjectsResponse::contents) //
+                .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+                .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+                .flatMapMany(Flux::fromIterable) //
+                .doOnNext(obj -> logger.debug("Found object: {}", obj.key())) //
+
+        ;
+    }
+
+    private Mono<String> getDataFromS3Object(String bucket, String key) {
+
+        GetObjectRequest request = GetObjectRequest.builder() //
+                .bucket(bucket) //
+                .key(key) //
+                .build();
+
+        CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+                s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+        return Mono.fromFuture(future) //
+                .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
+                .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
+                .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+                .onErrorResume(t -> Mono.empty());
+    }
+
+}
index ce2e1a1..4f0bc8c 100644 (file)
@@ -44,6 +44,12 @@ public class InfoType {
     @Getter
     private String kafkaInputTopic;
 
+    @Getter
+    private String inputJobType;
+
+    @Getter
+    private Object inputJobDefinition;
+
     private String dataType;
 
     @Getter
index 2e98d81..e6facbc 100644 (file)
@@ -53,6 +53,7 @@ import reactor.core.publisher.Mono;
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public abstract class JobDataDistributor {
     private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
+
     @Getter
     private final Job job;
     private Disposable subscription;
@@ -95,8 +96,6 @@ public abstract class JobDataDistributor {
     }
 
     public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
-        stop();
-
         collectHistoricalData();
 
         this.errorStats.resetIrrecoverableErrors();
@@ -117,7 +116,7 @@ public abstract class JobDataDistributor {
     private void collectHistoricalData() {
         PmReportFilter filter = job.getFilter() instanceof PmReportFilter ? (PmReportFilter) job.getFilter() : null;
 
-        if (filter != null) {
+        if (filter != null && filter.getFilterData().getPmRopStartTime() != null) {
             this.fileStore.createLock(collectHistoricalDataLockName()) //
                     .flatMap(isLockGranted -> isLockGranted ? Mono.just(isLockGranted)
                             : Mono.error(new LockedException(collectHistoricalDataLockName()))) //
@@ -128,8 +127,6 @@ public abstract class JobDataDistributor {
                     .flatMap(event -> filterAndBuffer(event, this.job), 1) //
                     .flatMap(this::sendToClient, 1) //
                     .onErrorResume(this::handleCollectHistoricalDataError) //
-                    .collectList() //
-                    .flatMap(list -> fileStore.deleteLock(collectHistoricalDataLockName())) //
                     .subscribe();
         }
     }
@@ -140,9 +137,9 @@ public abstract class JobDataDistributor {
             logger.debug("Locked exception: {} job: {}", t.getMessage(), job.getId());
             return Mono.empty(); // Ignore
         } else {
-            return fileStore.deleteLock(collectHistoricalDataLockName()) //
-                    .map(bool -> "OK") //
-                    .onErrorResume(t2 -> Mono.empty());
+            return tryDeleteLockFile() //
+                    .map(bool -> "OK");
+
         }
     }
 
@@ -176,7 +173,7 @@ public abstract class JobDataDistributor {
     }
 
     private void handleExceptionInStream(Throwable t) {
-        logger.warn("HttpDataConsumer exception: {}, jobId: {}", t.getMessage(), job.getId());
+        logger.warn("JobDataDistributor exception: {}, jobId: {}", t.getMessage(), job.getId());
         stop();
     }
 
@@ -187,6 +184,13 @@ public abstract class JobDataDistributor {
             this.subscription.dispose();
             this.subscription = null;
         }
+        tryDeleteLockFile().subscribe();
+    }
+
+    private Mono<Boolean> tryDeleteLockFile() {
+        return fileStore.deleteLock(collectHistoricalDataLockName()) //
+                .doOnNext(res -> logger.debug("Removed lockfile {} {}", collectHistoricalDataLockName(), res))
+                .onErrorResume(t -> Mono.just(false));
     }
 
     public synchronized boolean isRunning() {
index 6b5a0f7..eaeeae4 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.oran.dmaapadapter.tasks;
 
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
index e0f897e..7b9cbcf 100644 (file)
@@ -36,6 +36,7 @@ import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.r1.ConsumerJobInfo;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -128,10 +129,11 @@ public class ProducerRegstrationTask {
     }
 
     private Mono<String> registerTypesAndProducer() {
-        final int CONCURRENCY = 20;
+        final int CONCURRENCY = 1;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
+                .flatMap(this::createInputDataJob, CONCURRENCY)
                 .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
                         CONCURRENCY) //
                 .collectList() //
@@ -139,6 +141,29 @@ public class ProducerRegstrationTask {
                 .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
     }
 
+    private Mono<InfoType> createInputDataJob(InfoType type) {
+        if (type.getInputJobType() == null) {
+            return Mono.just(type);
+        }
+
+        ConsumerJobInfo info =
+                new ConsumerJobInfo(type.getInputJobType(), type.getInputJobDefinition(), "DmaapAdapter", "", "");
+
+        final String JOB_ID = type.getId() + "_5b3f4db6-3d9e-11ed-b878-0242ac120002";
+        String body = gson.toJson(info);
+
+        return restClient.put(consumerJobUrl(JOB_ID), body)
+                .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
+                        t.getMessage()))
+                .onErrorResume(t -> Mono.just("")) //
+                .doOnNext(n -> logger.info("Created job: {}, type: {}", JOB_ID, type.getInputJobType())) //
+                .map(x -> type);
+    }
+
+    private String consumerJobUrl(String jobId) {
+        return applicationConfig.getIcsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
+    }
+
     private Object typeSpecifcInfoObject() {
         return jsonObject("{}");
     }
index 7b6a3bc..84b5685 100644 (file)
@@ -90,8 +90,8 @@ import reactor.test.StepVerifier;
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.webclient.trust-store-used=true", //
-        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
-})
+        "app.configuration-filepath=./src/test/resources/test_application_configuration.json"//
+        "app.s3.endpointOverride="})
 class ApplicationTest {
 
     @Autowired
@@ -168,7 +168,7 @@ class ApplicationTest {
         }
     }
 
-    @Test
+    // @Test
     void testProtoBuf() throws Exception {
         String path = "./src/test/resources/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json";
 
@@ -200,6 +200,7 @@ class ApplicationTest {
     }
 
     static class TestApplicationConfig extends ApplicationConfig {
+
         @Override
         public String getIcsBaseUrl() {
             return thisProcessUrl();
@@ -233,7 +234,7 @@ class ApplicationTest {
             return new TomcatServletWebServerFactory();
         }
 
-        @Override
+        // @Override
         @Bean
         public ApplicationConfig getApplicationConfig() {
             TestApplicationConfig cfg = new TestApplicationConfig();
@@ -242,7 +243,7 @@ class ApplicationTest {
     }
 
     @BeforeEach
-    void init() {
+    public void init() {
         this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
         assertThat(this.jobs.size()).isZero();
         assertThat(this.consumerController.testResults.receivedBodies).isEmpty();
@@ -353,7 +354,6 @@ class ApplicationTest {
 
     @Test
     void testTrustValidation() throws IOException {
-
         String url = "https://localhost:" + applicationConfig.getLocalServerHttpPort() + "/v3/api-docs";
         ResponseEntity<String> resp = restClient(true).getForEntity(url).block();
         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
@@ -378,7 +378,7 @@ class ApplicationTest {
     @Test
     void testReceiveAndPostDataFromKafka() throws Exception {
         final String JOB_ID = "ID";
-        final String TYPE_ID = "KafkaInformationType";
+        final String TYPE_ID = "PmDataOverKafka";
         waitForRegistration();
 
         // Create a job
@@ -397,8 +397,14 @@ class ApplicationTest {
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
-        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+        assertThat(consumer.receivedBodies.get(0)).isEqualTo("[data]");
         assertThat(consumer.receivedHeaders.get(0)).containsEntry("content-type", "application/json");
+
+        // This only works in debugger. Removed for now.
+        assertThat(this.icsSimulatorController.testResults.createdJob).isNotNull();
+        assertThat(this.icsSimulatorController.testResults.createdJob.infoTypeId)
+                .isEqualTo("xml-file-data-to-filestore");
+
     }
 
     @Test
@@ -481,7 +487,7 @@ class ApplicationTest {
         Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE,
                 new Job.BufferTimeout(123, 456), null, null);
         String paramJson = gson.toJson(param);
-        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", "EI_PM_JOB_ID", toJson(paramJson));
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", "EI_PM_JOB_ID", toJson(paramJson));
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -516,7 +522,7 @@ class ApplicationTest {
                 + ".";
         Job.Parameters param = new Job.Parameters(expresssion, Job.Parameters.JSLT_FILTER_TYPE, null, null, null);
         String paramJson = gson.toJson(param);
-        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, toJson(paramJson));
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, toJson(paramJson));
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -583,7 +589,7 @@ class ApplicationTest {
         waitForRegistration();
 
         // Create a job with JsonPath Filtering
-        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationType", JOB_ID, this.jsonObjectJsonPath());
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverRest", JOB_ID, this.jsonObjectJsonPath());
 
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
@@ -617,7 +623,7 @@ class ApplicationTest {
         Job.Parameters param = new Job.Parameters(filterData, Job.Parameters.PM_FILTER_TYPE, null, null, null);
         String paramJson = gson.toJson(param);
 
-        ConsumerJobInfo jobInfo = consumerJobInfo("PmInformationTypeKafka", "EI_PM_JOB_ID", toJson(paramJson));
+        ConsumerJobInfo jobInfo = consumerJobInfo("PmDataOverKafka", "EI_PM_JOB_ID", toJson(paramJson));
         this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
     }
index ab9e15c..df30196 100644 (file)
@@ -63,6 +63,7 @@ public class IcsSimulatorController {
         ProducerRegistrationInfo registrationInfo = null;
         Map<String, ProducerInfoTypeInfo> types = Collections.synchronizedMap(new HashMap<>());
         String infoProducerId = null;
+        ConsumerJobInfo createdJob = null;
 
         public TestResults() {}
 
@@ -70,6 +71,11 @@ public class IcsSimulatorController {
             registrationInfo = null;
             types.clear();
             infoProducerId = null;
+            createdJob = null;
+        }
+
+        public void setCreatedJob(ConsumerJobInfo informationJobObject) {
+            this.createdJob = informationJobObject;
         }
     }
 
@@ -105,6 +111,17 @@ public class IcsSimulatorController {
         return new ResponseEntity<>(HttpStatus.OK);
     }
 
+    @PutMapping(path = "/data-consumer/v1/info-jobs/{infoJobId}", //
+            produces = MediaType.APPLICATION_JSON_VALUE, //
+            consumes = MediaType.APPLICATION_JSON_VALUE)
+    public ResponseEntity<Object> putIndividualInfoJob( //
+            @PathVariable("infoJobId") String jobId, //
+            @RequestBody ConsumerJobInfo informationJobObject) {
+        logger.info("*** added consumer job {}", jobId);
+        testResults.setCreatedJob(informationJobObject);
+        return new ResponseEntity<>(HttpStatus.OK);
+    }
+
     public void addJob(ConsumerJobInfo job, String jobId, AsyncRestClient restClient) throws ServiceException {
         String url = this.testResults.registrationInfo.jobCallbackUrl;
         ProducerJobInfo request =
index c34d0f3..4ec51ff 100644 (file)
@@ -285,7 +285,7 @@ class IntegrationWithIcs {
     @Test
     void testPmFilter() throws Exception {
         await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
-        final String TYPE_ID = "PmInformationType";
+        final String TYPE_ID = "PmDataOverRest";
 
         String jsonStr =
                 reQuote("{ 'filterType' : 'pmdata', 'filter': { 'measTypes': [ 'succImmediateAssignProcs' ] } }");
index 9cf3dec..668a327 100644 (file)
@@ -46,6 +46,8 @@ import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig;
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.datastore.DataStore;
+import org.oran.dmaapadapter.datastore.S3ObjectStore;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.filter.PmReportFilter;
 import org.oran.dmaapadapter.r1.ConsumerJobInfo;
@@ -53,11 +55,7 @@ import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
 import org.oran.dmaapadapter.repository.Job;
 import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DataStore;
-import org.oran.dmaapadapter.tasks.DataStore.Bucket;
-import org.oran.dmaapadapter.tasks.FileStore;
 import org.oran.dmaapadapter.tasks.KafkaTopicListener;
-import org.oran.dmaapadapter.tasks.S3ObjectStore;
 import org.oran.dmaapadapter.tasks.TopicListener;
 import org.oran.dmaapadapter.tasks.TopicListeners;
 import org.slf4j.Logger;
@@ -83,11 +81,13 @@ import reactor.kafka.sender.SenderRecord;
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
-        "app.pm-files-path=./src/test/resources/", "app.s3.locksBucket=ropfilelocks", "app.s3.bucket=ropfiles"}) //
+        "app.pm-files-path=./src/test/resources/", //
+        "app.s3.locksBucket=ropfilelocks", //
+        "app.s3.bucket=ropfiles"}) //
 class IntegrationWithKafka {
 
     final String TYPE_ID = "KafkaInformationType";
-    final String PM_TYPE_ID = "PmInformationTypeKafka";
+    final String PM_TYPE_ID = "PmDataOverKafka";
 
     @Autowired
     private ApplicationConfig applicationConfig;
@@ -166,7 +166,7 @@ class IntegrationWithKafka {
 
         int count = 0;
 
-        public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic) {
+        public KafkaReceiver(ApplicationConfig applicationConfig, String outputTopic, SecurityContext securityContext) {
             this.OUTPUT_TOPIC = outputTopic;
 
             // Create a listener to the output topic. The KafkaTopicListener happens to be
@@ -209,8 +209,8 @@ class IntegrationWithKafka {
     @BeforeEach
     void init() {
         if (kafkaReceiver == null) {
-            kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic");
-            kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2");
+            kafkaReceiver = new KafkaReceiver(this.applicationConfig, "ouputTopic", this.securityContext);
+            kafkaReceiver2 = new KafkaReceiver(this.applicationConfig, "ouputTopic2", this.securityContext);
         }
         kafkaReceiver.reset();
         kafkaReceiver2.reset();
@@ -464,69 +464,6 @@ class IntegrationWithKafka {
         logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
     }
 
-    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
-    // @Test
-    void kafkaCharacteristics_pmFilter_localFile() throws Exception {
-        // Filter PM reports and sent to two jobs over Kafka
-
-        final String JOB_ID = "kafkaCharacteristics";
-        final String JOB_ID2 = "kafkaCharacteristics2";
-
-        // Register producer, Register types
-        await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
-        assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
-        PmReportFilter.FilterData filterData = new PmReportFilter.FilterData();
-        filterData.getMeasTypes().add("succImmediateAssignProcs");
-        filterData.getMeasObjClass().add("UtranCell");
-
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver.OUTPUT_TOPIC, filterData), JOB_ID,
-                restClient());
-        this.icsSimulatorController.addJob(consumerJobInfoKafka(kafkaReceiver2.OUTPUT_TOPIC, filterData), JOB_ID2,
-                restClient());
-
-        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
-        waitForKafkaListener();
-
-        final int NO_OF_OBJECTS = 100;
-
-        Instant startTime = Instant.now();
-
-        KafkaTopicListener.NewFileEvent event =
-                KafkaTopicListener.NewFileEvent.builder().filename("pm_report.json").build();
-        String eventAsString = gson.toJson(event);
-
-        var dataToSend = Flux.range(1, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(eventAsString, "key", PM_TYPE_ID));
-        sendDataToKafka(dataToSend);
-
-        while (kafkaReceiver.count != NO_OF_OBJECTS) {
-            logger.info("sleeping {}", kafkaReceiver.count);
-            Thread.sleep(1000 * 1);
-        }
-
-        final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
-        logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
-        logger.info("***  kafkaReceiver2 :" + kafkaReceiver.count);
-
-        printStatistics();
-    }
-
-    @Test
-    void testListFiles() {
-        FileStore fileStore = new FileStore(applicationConfig);
-
-        fileStore.copyFileTo(Path.of("./src/test/resources/pm_report.json"),
-                "O-DU-1122/A20000626.2315+0200-2330+0200_HTTPS-6-73.xml.gz101.json").block();
-
-        List<String> files = fileStore.listFiles(Bucket.FILES, "O-DU-112").collectList().block();
-        assertThat(files).hasSize(1);
-
-        files = fileStore.listFiles(Bucket.FILES, "O-DU-1122").collectList().block();
-        assertThat(files).hasSize(1);
-
-        fileStore.deleteObject(Bucket.FILES, files.get(0));
-    }
-
     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
     @Test
     void kafkaCharacteristics_pmFilter_s3() throws Exception {
index 2863590..64ef1c5 100644 (file)
@@ -8,21 +8,29 @@
       },
       {
          "id": "KafkaInformationType",
-         "kafkaInputTopic": "KafkaInput",
-         "useHttpProxy": false
+         "kafkaInputTopic": "KafkaInput"
       },
       {
-         "id": "PmInformationType",
+         "id": "PmDataOverRest",
          "dmaapTopicUrl": "/dmaap-topic-2",
          "useHttpProxy": false,
          "dataType": "PmData",
          "isJson": true
       },
       {
-         "id": "PmInformationTypeKafka",
-         "kafkaInputTopic": "PmFileData",
-         "useHttpProxy": false,
+         "id": "PmDataOverKafka",
+         "kafkaInputTopic": "FileReadyEvent",
          "dataType": "PmData",
+         "inputJobType": "xml-file-data-to-filestore",
+         "inputJobDefinition": {
+            "kafkaOutputTopic": "FileReadyEvent",
+            "filestore-output-bucket": "pm-files-json",
+            "filterType": "pmdata",
+            "filter": {
+               "inputCompression": "xml.gz",
+               "outputCompression": "none"
+            }
+         },
          "isJson": true
       }
    ]