package org.oransc.policyagent.clients;
-import java.util.Collection;
+import java.util.List;
import org.oransc.policyagent.repository.Policy;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface A1Client {
public static enum A1ProtocolType {
- UNKNOWN, STD_V1
+ UNKNOWN, STD_V1, OSC_V1
}
public Mono<A1ProtocolType> getProtocolVersion();
- public Mono<Collection<String>> getPolicyTypeIdentities();
+ public Mono<List<String>> getPolicyTypeIdentities();
- public Mono<Collection<String>> getPolicyIdentities();
+ public Mono<List<String>> getPolicyIdentities();
public Mono<String> getPolicyTypeSchema(String policyTypeId);
public Mono<String> putPolicy(Policy policy);
- public Mono<String> deletePolicy(String policyId);
+ public Mono<String> deletePolicy(Policy policy);
+
+ public Flux<String> deleteAllPolicies();
}
private Mono<A1Client> createA1Client(Ric ric, A1ProtocolType version) {
if (version == A1ProtocolType.STD_V1) {
return Mono.just(createStdA1ClientImpl(ric));
+ } else if (version == A1ProtocolType.OSC_V1) {
+ return Mono.just(new OscA1Client(ric.getConfig()));
}
return Mono.error(new ServiceException("Not supported protocoltype: " + version));
}
*/
package org.oransc.policyagent.clients;
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class AsyncRestClient {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final WebClient client;
private static class AsyncRestClientException extends Exception {
}
public Mono<String> put(String uri, String body) {
+ logger.debug("PUT uri = '{}''", uri);
return client.put() //
.uri(uri) //
.contentType(MediaType.APPLICATION_JSON) //
}
public Mono<String> get(String uri) {
+ logger.debug("GET uri = '{}''", uri);
return client.get() //
.uri(uri) //
.retrieve() //
}
public Mono<String> delete(String uri) {
+ logger.debug("DELETE uri = '{}''", uri);
return client.delete() //
.uri(uri) //
.retrieve() //
package org.oransc.policyagent.clients;
import java.lang.invoke.MethodHandles;
-import java.util.Collection;
+import java.util.ArrayList;
+import java.util.List;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
import org.oransc.policyagent.configuration.RicConfig;
import org.oransc.policyagent.repository.Policy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class OscA1Client implements A1Client {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final RicConfig ricConfig;
+ private final AsyncRestClient restClient;
public OscA1Client(RicConfig ricConfig) {
- this.ricConfig = ricConfig;
- logger.debug("OscA1Client for ric: {}", this.ricConfig.name());
+ String baseUrl = ricConfig.baseUrl() + "/a1-p";
+ this.restClient = new AsyncRestClient(baseUrl);
+ logger.debug("OscA1Client for ric: {}", ricConfig.name());
}
@Override
- public Mono<Collection<String>> getPolicyTypeIdentities() {
- return Mono.error(new Exception("Not impl"));
+ public Mono<List<String>> getPolicyTypeIdentities() {
+ return restClient.get("/policytypes") //
+ .flatMap(this::parseJsonArrayOfString);
}
@Override
- public Mono<Collection<String>> getPolicyIdentities() {
- return Mono.error(new Exception("Not impl"));
+ public Mono<List<String>> getPolicyIdentities() {
+ return getPolicyTypeIdentities() //
+ .flatMapMany(types -> Flux.fromIterable(types)) //
+ .flatMap(type -> getPolicyIdentities(type)) //
+ .flatMap(policyIds -> Flux.fromIterable(policyIds)) //
+ .collectList();
+ }
+
+ private Mono<List<String>> getPolicyIdentities(String typeId) {
+ return restClient.get("/policytypes/" + typeId + "/policies") //
+ .flatMap(this::parseJsonArrayOfString);
}
@Override
public Mono<String> getPolicyTypeSchema(String policyTypeId) {
- return Mono.error(new Exception("Not impl"));
+ return restClient.get("/policytypes/" + policyTypeId) //
+ .flatMap(response -> getCreateSchema(response, policyTypeId));
+ }
+
+ private Mono<String> getCreateSchema(String policyTypeResponse, String policyTypeId) {
+ try {
+ JSONObject obj = new JSONObject(policyTypeResponse);
+ JSONObject schemaObj = obj.getJSONObject("create_schema");
+ schemaObj.put("title", policyTypeId);
+ return Mono.just(schemaObj.toString());
+ } catch (Exception e) {
+ logger.error("Unexcpected response for policy type: {}", policyTypeResponse, e);
+ return Mono.error(e);
+ }
}
@Override
public Mono<String> putPolicy(Policy policy) {
- return Mono.error(new Exception("Not impl"));
+ return restClient.put("/policytypes/" + policy.type().name() + "/policies/" + policy.id(), policy.json());
}
@Override
- public Mono<String> deletePolicy(String policyId) {
- return Mono.error(new Exception("Not impl"));
+ public Mono<String> deletePolicy(Policy policy) {
+ return deletePolicy(policy.type().name(), policy.id());
+ }
+
+ private Mono<String> deletePolicy(String typeId, String policyId) {
+ return restClient.delete("/policytypes/" + typeId + "/policies/" + policyId);
}
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
- return Mono.error(new Exception("Not impl"));
+ return restClient.get("/healthcheck") //
+ .flatMap(resp -> Mono.just(A1ProtocolType.OSC_V1));
}
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ return getPolicyTypeIdentities() //
+ .flatMapMany(types -> Flux.fromIterable(types)) //
+ .flatMap(typeId -> deletePoliciesForType(typeId)); //
+ }
+
+ private Flux<String> deletePoliciesForType(String typeId) {
+ return getPolicyIdentities(typeId) //
+ .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
+ .flatMap(policyId -> deletePolicy(typeId, policyId)); //
+ }
+
+ private Mono<List<String>> parseJsonArrayOfString(String inputString) {
+ try {
+ List<String> arrayList = new ArrayList<>();
+ JSONArray jsonArray = new JSONArray(inputString);
+ for (int i = 0; i < jsonArray.length(); i++) {
+ arrayList.add(jsonArray.getString(i));
+ }
+ logger.debug("A1 client: received list = {}", arrayList);
+ return Mono.just(arrayList);
+ } catch (JSONException ex) { // invalid json
+ return Mono.error(ex);
+ }
+ }
}
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.json.JSONArray;
import org.oransc.policyagent.repository.Policy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class StdA1Client implements A1Client {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final RicConfig ricConfig;
private final AsyncRestClient restClient;
public StdA1Client(RicConfig ricConfig) {
- this.ricConfig = ricConfig;
- this.restClient = new AsyncRestClient(getBaseUrl());
+ String baseUrl = ricConfig.baseUrl() + "/A1-P/v1";
+ this.restClient = new AsyncRestClient(baseUrl);
}
public StdA1Client(RicConfig ricConfig, AsyncRestClient restClient) {
- this.ricConfig = ricConfig;
this.restClient = restClient;
}
@Override
- public Mono<Collection<String>> getPolicyTypeIdentities() {
- logger.debug("getPolicyTypeIdentities nearRtRicUrl = {}", ricConfig.baseUrl());
+ public Mono<List<String>> getPolicyTypeIdentities() {
return restClient.get("/policytypes/identities") //
.flatMap(this::parseJsonArrayOfString);
}
@Override
- public Mono<Collection<String>> getPolicyIdentities() {
- logger.debug("getPolicyIdentities nearRtRicUrl = {}", ricConfig.baseUrl());
+ public Mono<List<String>> getPolicyIdentities() {
return restClient.get("/policies/identities") //
.flatMap(this::parseJsonArrayOfString);
}
@Override
public Mono<String> getPolicyTypeSchema(String policyTypeId) {
- logger.debug("getPolicyType nearRtRicUrl = {}, policyTypeId = {}", ricConfig.baseUrl(), policyTypeId);
Mono<String> response = restClient.get("/policytypes/" + policyTypeId);
return response.flatMap(this::createMono);
}
@Override
public Mono<String> putPolicy(Policy policy) {
- logger.debug("putPolicy nearRtRicUrl = {}, policyId = {}, policyString = {}", //
- policy.ric().getConfig().baseUrl(), policy.id(), policy.json());
// TODO update when simulator is updated to include policy type
// Mono<String> response = client.put("/policies/" + policy.id() + "?policyTypeId=" + policy.type().name(),
// policy.json());
}
@Override
- public Mono<String> deletePolicy(String policyId) {
- logger.debug("deletePolicy nearRtRicUrl = {}, policyId = {}", ricConfig.baseUrl(), policyId);
- return restClient.delete("/policies/" + policyId);
+ public Mono<String> deletePolicy(Policy policy) {
+ return deletePolicy(policy.id());
+ }
+
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ return getPolicyIdentities() //
+ .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // )
+ .flatMap(policyId -> deletePolicy(policyId)); //
}
@Override
.flatMap(x -> Mono.just(A1ProtocolType.STD_V1));
}
- private String getBaseUrl() {
- return ricConfig.baseUrl() + "/A1-P/v1";
+ private Mono<String> deletePolicy(String policyId) {
+ return restClient.delete("/policies/" + policyId);
}
- private Mono<Collection<String>> parseJsonArrayOfString(String inputString) {
+ private Mono<List<String>> parseJsonArrayOfString(String inputString) {
try {
List<String> arrayList = new ArrayList<>();
JSONArray jsonArray = new JSONArray(inputString);
if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) {
policies.remove(policy);
return a1ClientFactory.createA1Client(policy.ric()) //
- .flatMap(client -> client.deletePolicy(id)) //
+ .flatMap(client -> client.deletePolicy(policy)) //
.flatMap(notUsed -> {
return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
});
private Flux<Object> startRecover(Ric ric, A1Client a1Client) {
Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric, a1Client);
- Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric, a1Client);
+ Flux<?> deletePoliciesInRic = a1Client.deleteAllPolicies();
Flux<?> recreatePoliciesInRic = recreateAllPoliciesInRic(ric, a1Client);
return Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic);
Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
.flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client));
Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(a1Client -> deleteAllPoliciesInRic(ric, a1Client));
+ .flatMapMany(a1Client -> a1Client.deleteAllPolicies());
Flux.merge(recoverTypes, deletePoliciesInRic) //
.subscribe(x -> logger.debug("Brute recover: " + x), //
}
}
- private Flux<String> deleteAllPoliciesInRic(Ric ric, A1Client a1Client) {
- return a1Client.getPolicyIdentities() //
- .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
- .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
- .flatMap(policyId -> a1Client.deletePolicy(policyId)); //
- }
-
private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
synchronized (policies) {
return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
private Mono<Policy> deletePolicyInRic(Policy policy) {
return a1ClientFactory.createA1Client(policy.ric()) //
- .flatMap(client -> client.deletePolicy(policy.id()) //
+ .flatMap(client -> client.deletePolicy(policy) //
.onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
.map((nothing) -> policy));
}
public void testDeletePolicy() {
when(asyncRestClientMock.delete(POLICIES_URL + POLICY_1_ID)).thenReturn(Mono.empty());
- Mono<?> responseMono = a1Client.deletePolicy(POLICY_1_ID);
+ Policy policy = createPolicy(RIC_URL, POLICY_1_ID, POLICY_JSON_VALID, POLICY_TYPE);
+ Mono<?> responseMono = a1Client.deletePolicy(policy);
verify(asyncRestClientMock).delete(POLICIES_URL + POLICY_1_ID);
StepVerifier.create(responseMono).expectComplete().verify();
}
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.List;
import java.util.Vector;
import org.junit.jupiter.api.BeforeEach;
import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
RepositorySupervision supervisorUnderTest =
new RepositorySupervision(rics, policies, a1ClientFactory, types, services);
- Mono<Collection<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
+ Mono<List<String>> policyIds = Mono.just(Arrays.asList("policyId1", "policyId2"));
doReturn(policyIds).when(a1ClientMock).getPolicyTypeIdentities();
doReturn(policyIds).when(a1ClientMock).getPolicyIdentities();
- doReturn(Mono.empty()).when(a1ClientMock).deletePolicy(anyString());
doReturn(Mono.just("schema")).when(a1ClientMock).getPolicyTypeSchema(anyString());
doReturn(Mono.just("OK")).when(a1ClientMock).putPolicy(any());
+ doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
supervisorUnderTest.checkAllRics();
await().untilAsserted(() -> RicState.IDLE.equals(ric2.state()));
await().untilAsserted(() -> RicState.IDLE.equals(ric3.state()));
- verify(a1ClientMock, times(3)).deletePolicy("policyId2");
+ verify(a1ClientMock, times(3)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
}
}
import static org.oransc.policyagent.repository.Ric.RicState.IDLE;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.List;
import java.util.Vector;
import org.junit.jupiter.api.BeforeEach;
import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
@Test
public void startup_allOk() {
- Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
- Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
+ Mono<List<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<List<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
doReturn(policyTypes1, policyTypes2).when(a1ClientMock).getPolicyTypeIdentities();
- Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
- doReturn(policies).when(a1ClientMock).getPolicyIdentities();
doReturn(Mono.just("Schema")).when(a1ClientMock).getPolicyTypeSchema(anyString());
- doReturn(Mono.just("OK")).when(a1ClientMock).deletePolicy(anyString());
+ doReturn(Flux.just("OK")).when(a1ClientMock).deleteAllPolicies();
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
await().untilAsserted(() -> assertThat(policyTypes.size()).isEqualTo(2));
- verify(a1ClientMock, times(2)).getPolicyTypeIdentities();
- verify(a1ClientMock, times(2)).deletePolicy(POLICY_ID_1);
- verify(a1ClientMock, times(2)).deletePolicy(POLICY_ID_2);
+ verify(a1ClientMock, times(2)).deleteAllPolicies();
assertTrue(policyTypes.contains(POLICY_TYPE_1_NAME), POLICY_TYPE_1_NAME + " not added to PolicyTypes.");
assertTrue(policyTypes.contains(POLICY_TYPE_2_NAME), POLICY_TYPE_2_NAME + " not added to PolicyTypes.");
public void startup_unableToConnectToGetTypes() {
Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
doReturn(error, error).when(a1ClientMock).getPolicyTypeIdentities();
- doReturn(error).when(a1ClientMock).getPolicyIdentities();
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
}
@Test
- public void startup_unableToConnectToGetPolicies() {
+ public void startup_unableToConnectToDeleteAllPolicies() {
- Mono<Collection<String>> policyTypes = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ Mono<List<String>> policyTypes = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(policyTypes);
when(a1ClientMock.getPolicyTypeSchema(anyString())).thenReturn(Mono.just("Schema"));
- Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
- doReturn(error).when(a1ClientMock).getPolicyIdentities();
+ Flux<?> error = Flux.error(new Exception("Unable to contact ric."));
+ doReturn(error).when(a1ClientMock).deleteAllPolicies();
Rics rics = new Rics();
StartupService serviceUnderTest = new StartupService(appConfigMock, refreshTaskMock, rics, new PolicyTypes(),
package org.oransc.policyagent.utils;
-import java.util.Collection;
+import java.util.List;
import java.util.Vector;
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.PolicyType;
import org.oransc.policyagent.repository.PolicyTypes;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MockA1Client implements A1Client {
}
@Override
- public Mono<Collection<String>> getPolicyTypeIdentities() {
+ public Mono<List<String>> getPolicyTypeIdentities() {
synchronized (this.policyTypes) {
- Vector<String> result = new Vector<>();
+ List<String> result = new Vector<>();
for (PolicyType p : this.policyTypes.getAll()) {
result.add(p.name());
}
}
@Override
- public Mono<Collection<String>> getPolicyIdentities() {
+ public Mono<List<String>> getPolicyIdentities() {
synchronized (this.policies) {
Vector<String> result = new Vector<>();
for (Policy policy : policies.getAll()) {
}
@Override
- public Mono<String> deletePolicy(String policyId) {
- this.policies.removeId(policyId);
+ public Mono<String> deletePolicy(Policy policy) {
+ this.policies.remove(policy);
return Mono.just("OK");
}
return Mono.just(A1ProtocolType.STD_V1);
}
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ this.policies.clear();
+ return Flux.empty();
+ }
+
}