Bugfix in RIC synchronization
[nonrtric.git] / policy-agent / src / test / java / org / oransc / policyagent / utils / MockA1Client.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.utils;
22
23 import java.time.Duration;
24 import java.util.List;
25 import java.util.Vector;
26
27 import org.oransc.policyagent.clients.A1Client;
28 import org.oransc.policyagent.repository.Policies;
29 import org.oransc.policyagent.repository.Policy;
30 import org.oransc.policyagent.repository.PolicyType;
31 import org.oransc.policyagent.repository.PolicyTypes;
32
33 import reactor.core.publisher.Flux;
34 import reactor.core.publisher.Mono;
35 import reactor.core.publisher.MonoSink;
36
37 public class MockA1Client implements A1Client {
38     Policies policies = new Policies();
39     private final PolicyTypes policyTypes;
40     private final Duration asynchDelay;
41
42     public MockA1Client(PolicyTypes policyTypes, Duration asynchDelay) {
43         this.policyTypes = policyTypes;
44         this.asynchDelay = asynchDelay;
45     }
46
47     @Override
48     public Mono<List<String>> getPolicyTypeIdentities() {
49         synchronized (this.policyTypes) {
50             List<String> result = new Vector<>();
51             for (PolicyType p : this.policyTypes.getAll()) {
52                 result.add(p.name());
53             }
54             return mono(result);
55         }
56     }
57
58     @Override
59     public Mono<List<String>> getPolicyIdentities() {
60         synchronized (this.policies) {
61             Vector<String> result = new Vector<>();
62             for (Policy policy : policies.getAll()) {
63                 result.add(policy.id());
64             }
65
66             return mono(result);
67         }
68     }
69
70     @Override
71     public Mono<String> getPolicyTypeSchema(String policyTypeId) {
72         try {
73             return mono(this.policyTypes.getType(policyTypeId).schema());
74         } catch (Exception e) {
75             return Mono.error(e);
76         }
77     }
78
79     @Override
80     public Mono<String> putPolicy(Policy p) {
81         this.policies.put(p);
82         return mono("OK");
83
84     }
85
86     @Override
87     public Mono<String> deletePolicy(Policy policy) {
88         this.policies.remove(policy);
89         return mono("OK");
90     }
91
92     public Policies getPolicies() {
93         return this.policies;
94     }
95
96     @Override
97     public Mono<A1ProtocolType> getProtocolVersion() {
98         return mono(A1ProtocolType.STD_V1_1);
99     }
100
101     @Override
102     public Flux<String> deleteAllPolicies() {
103         this.policies.clear();
104         return mono("OK") //
105             .flatMapMany(Flux::just);
106     }
107
108     @Override
109     public Mono<String> getPolicyStatus(Policy policy) {
110         return mono("OK");
111     }
112
113     private <T> Mono<T> mono(T value) {
114         if (this.asynchDelay.isZero()) {
115             return Mono.just(value);
116         } else {
117             return Mono.create(monoSink -> asynchResponse(monoSink, value));
118         }
119     }
120
121     @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
122     private void sleep() {
123         try {
124             Thread.sleep(this.asynchDelay.toMillis());
125         } catch (InterruptedException e) {
126             e.printStackTrace();
127         }
128     }
129
130     private <T> void asynchResponse(MonoSink<T> callback, T str) {
131         Thread thread = new Thread(() -> {
132             sleep(); // Simulate a network delay
133             callback.success(str);
134         });
135         thread.start();
136     }
137
138 }