Bugfix in RIC synchronization
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / utils / MockA1Client.java
index d6cd533..3265d76 100644 (file)
@@ -2,7 +2,7 @@
  * ========================LICENSE_START=================================
  * O-RAN-SC
  * %%
- * Copyright (C) 2019 Nordix Foundation
+ * Copyright (C) 2020 Nordix Foundation
  * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,7 +20,8 @@
 
 package org.oransc.policyagent.utils;
 
-import java.util.Collection;
+import java.time.Duration;
+import java.util.List;
 import java.util.Vector;
 
 import org.oransc.policyagent.clients.A1Client;
@@ -28,43 +29,48 @@ import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
 
 public class MockA1Client implements A1Client {
     Policies policies = new Policies();
     private final PolicyTypes policyTypes;
+    private final Duration asynchDelay;
 
-    public MockA1Client(PolicyTypes policyTypes) {
+    public MockA1Client(PolicyTypes policyTypes, Duration asynchDelay) {
         this.policyTypes = policyTypes;
+        this.asynchDelay = asynchDelay;
     }
 
     @Override
-    public Mono<Collection<String>> getPolicyTypeIdentities() {
+    public Mono<List<String>> getPolicyTypeIdentities() {
         synchronized (this.policyTypes) {
-            Vector<String> result = new Vector<>();
+            List<String> result = new Vector<>();
             for (PolicyType p : this.policyTypes.getAll()) {
                 result.add(p.name());
             }
-            return Mono.just(result);
+            return mono(result);
         }
     }
 
     @Override
-    public Mono<Collection<String>> getPolicyIdentities() {
+    public Mono<List<String>> getPolicyIdentities() {
         synchronized (this.policies) {
             Vector<String> result = new Vector<>();
             for (Policy policy : policies.getAll()) {
                 result.add(policy.id());
             }
 
-            return Mono.just(result);
+            return mono(result);
         }
     }
 
     @Override
     public Mono<String> getPolicyTypeSchema(String policyTypeId) {
         try {
-            return Mono.just(this.policyTypes.getType(policyTypeId).schema());
+            return mono(this.policyTypes.getType(policyTypeId).schema());
         } catch (Exception e) {
             return Mono.error(e);
         }
@@ -73,13 +79,14 @@ public class MockA1Client implements A1Client {
     @Override
     public Mono<String> putPolicy(Policy p) {
         this.policies.put(p);
-        return Mono.just("OK");
+        return mono("OK");
+
     }
 
     @Override
-    public Mono<String> deletePolicy(String policyId) {
-        this.policies.removeId(policyId);
-        return Mono.just("OK");
+    public Mono<String> deletePolicy(Policy policy) {
+        this.policies.remove(policy);
+        return mono("OK");
     }
 
     public Policies getPolicies() {
@@ -88,7 +95,44 @@ public class MockA1Client implements A1Client {
 
     @Override
     public Mono<A1ProtocolType> getProtocolVersion() {
-        return Mono.just(A1ProtocolType.STD_V1);
+        return mono(A1ProtocolType.STD_V1_1);
+    }
+
+    @Override
+    public Flux<String> deleteAllPolicies() {
+        this.policies.clear();
+        return mono("OK") //
+            .flatMapMany(Flux::just);
+    }
+
+    @Override
+    public Mono<String> getPolicyStatus(Policy policy) {
+        return mono("OK");
+    }
+
+    private <T> Mono<T> mono(T value) {
+        if (this.asynchDelay.isZero()) {
+            return Mono.just(value);
+        } else {
+            return Mono.create(monoSink -> asynchResponse(monoSink, value));
+        }
+    }
+
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    private void sleep() {
+        try {
+            Thread.sleep(this.asynchDelay.toMillis());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private <T> void asynchResponse(MonoSink<T> callback, T str) {
+        Thread thread = new Thread(() -> {
+            sleep(); // Simulate a network delay
+            callback.success(str);
+        });
+        thread.start();
     }
 
 }