+ String json = gson.toJson(service);
+ return json;
+ }
+
+ private void putService(String name) {
+ putService(name, 0, null);
+ }
+
+ private void putService(String name, long keepAliveIntervalSeconds, @Nullable HttpStatus expectedStatus) {
+ String url = "/service";
+ String body = createServiceJson(name, keepAliveIntervalSeconds);
+ ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
+ if (expectedStatus != null) {
+ assertEquals(expectedStatus, resp.getStatusCode(), "");
+ }
+ }
+
+ private String baseUrl() {
+ return "https://localhost:" + port;
+ }
+
+ private String jsonString() {
+ return "{\"servingCellNrcgi\":\"1\"}";
+ }
+
+ @Test
+ void testConcurrency() throws Exception {
+ final Instant startTime = Instant.now();
+ List<Thread> threads = new ArrayList<>();
+ a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+ addRic("ric");
+ addPolicyType("type1", "ric");
+ addPolicyType("type2", "ric");
+
+ for (int i = 0; i < 10; ++i) {
+ Thread t =
+ new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
+ "TestThread_" + i);
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ assertThat(policies.size()).isEqualTo(0);
+ logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
+ }
+
+ private AsyncRestClient restClient(boolean useTrustValidation) {
+ WebClientConfig config = this.applicationConfig.getWebClientConfig();
+ config = ImmutableWebClientConfig.builder() //
+ .isTrustStoreUsed(useTrustValidation) //
+ .trustStore(config.trustStore()) //
+ .trustStorePassword(config.trustStorePassword()) //
+ .build();
+
+ return new AsyncRestClient(baseUrl(), config);
+ }
+
+ private AsyncRestClient restClient() {
+ return restClient(false);
+ }
+
+ private void testErrorCode(Mono<?> request, HttpStatus expStatus) {
+ testErrorCode(request, expStatus, "");
+ }
+
+ private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
+ StepVerifier.create(request) //
+ .expectSubscription() //
+ .expectErrorMatches(t -> checkWebClientError(t, expStatus, responseContains)) //
+ .verify();
+ }
+
+ private boolean checkWebClientError(Throwable t, HttpStatus expStatus, String responseContains) {
+ assertTrue(t instanceof WebClientResponseException);
+ WebClientResponseException e = (WebClientResponseException) t;
+ assertThat(e.getStatusCode()).isEqualTo(expStatus);
+ assertThat(e.getResponseBodyAsString()).contains(responseContains);
+ return true;
+ }
+
+ private MockA1Client getA1Client(String ricName) throws ServiceException {
+ return a1ClientFactory.getOrCreateA1Client(ricName);
+ }
+
+ private PolicyType createPolicyType(String policyTypeName) {
+ return ImmutablePolicyType.builder() //
+ .name(policyTypeName) //
+ .schema("{\"title\":\"" + policyTypeName + "\"}") //
+ .build();
+ }
+
+ private PolicyType addPolicyType(String policyTypeName, String ricName) {
+ PolicyType type = createPolicyType(policyTypeName);
+ policyTypes.put(type);
+ addRic(ricName).addSupportedPolicyType(type);
+ return type;
+ }
+
+ private Ric addRic(String ricName) {
+ return addRic(ricName, null);
+ }
+
+ private Ric addRic(String ricName, String managedElement) {
+ if (rics.get(ricName) != null) {
+ return rics.get(ricName);
+ }
+ List<String> mes = new ArrayList<>();
+ if (managedElement != null) {
+ mes.add(managedElement);
+ }
+ RicConfig conf = ImmutableRicConfig.builder() //
+ .name(ricName) //
+ .baseUrl(ricName) //
+ .managedElementIds(mes) //
+ .controllerName("") //
+ .build();
+ Ric ric = new Ric(conf);
+ ric.setState(Ric.RicState.AVAILABLE);
+ this.rics.put(ric);
+ return ric;