+ this.infoTypes.remove(this.infoTypes.getType(TYPE_ID));
+ assertThat(this.infoJobs.size()).isZero();
+ }
+
+ @Test
+ void testConsumerTypeSubscriptionDatabase() {
+ final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+
+ // PUT a subscription
+ String body = gson.toJson(info);
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+ InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ assertThat(restoredSubscriptions.size()).isEqualTo(1);
+ assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1);
+
+ // Delete the subscription
+ restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ assertThat(restoredSubscriptions.size()).isZero();
+ }
+
+ @Test
+ void testConsumerTypeSubscription() throws Exception {
+
+ final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+
+ testErrorCode(restClient().get(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+ "Could not find Information subscription: junk");
+
+ testErrorCode(restClient().delete(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
+ "Could not find Information subscription: junk");
+
+ {
+ // PUT a subscription
+ String body = gson.toJson(info);
+ ResponseEntity<String> resp =
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
+ resp = restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
+ }
+ {
+ // GET IDs
+ ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl()).block();
+ assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
+ resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=owner").block();
+ assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
+ resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=junk").block();
+ assertThat(resp.getBody()).isEqualTo("[]");
+ }
+
+ {
+ // GET the individual subscription
+ ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ ConsumerTypeSubscriptionInfo respInfo = gson.fromJson(resp.getBody(), ConsumerTypeSubscriptionInfo.class);
+ assertThat(respInfo).isEqualTo(info);
+ }
+
+ {
+ // Test the callbacks
+ final ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+
+ // Test callback for PUT type
+ this.putInfoType(TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(1));
+ assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(0).state)
+ .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED);
+
+ // Test callback for DELETE type
+ this.deleteInfoType(TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(2));
+ assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(1).state)
+ .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED);
+ }
+
+ {
+ // DELETE the subscription
+ ResponseEntity<String> resp =
+ restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);
+ assertThat(this.infoTypeSubscriptions.size()).isZero();
+ resp = restClient().getForEntity(typeSubscriptionUrl()).block();
+ assertThat(resp.getBody()).isEqualTo("[]");
+ }
+ }
+
+ @Test
+ void testRemovingNonWorkingSubscription() throws Exception {
+ // Test that subscriptions are removed for a unresponsive consumer
+
+ // PUT a subscription with a junk callback
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+ String body = gson.toJson(info);
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+ this.putInfoType(TYPE_ID);
+ // The callback will fail and the subscription will be removed
+ await().untilAsserted(() -> assertThat(this.infoTypeSubscriptions.size()).isZero());