this.infoJobs.clear();
this.infoTypes.clear();
this.infoProducers.clear();
+ this.infoTypeSubscriptions.clear();
this.producerSimulator.getTestResults().reset();
this.consumerSimulator.getTestResults().reset();
}
putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
verifyJobStatus(EI_JOB_ID, "ENABLED");
-
}
@Test
ResponseEntity<String> resp = restClient().getForEntity(url).block();
ProducerInfoTypeInfo info = gson.fromJson(resp.getBody(), ProducerInfoTypeInfo.class);
assertThat(info.jobDataSchema).isNotNull();
+ assertThat(info.typeSpecificInformation).isNotNull();
}
@Test
InfoTypes types = new InfoTypes(this.applicationConfig);
types.restoreTypesFromDatabase();
assertThat(types.size()).isEqualTo(1);
-
}
{
// Restore the jobs, no jobs in database
assertThat(this.infoJobs.size()).isZero();
}
+ @Test
+ void testConsumerTypeSubscriptionDatabase() {
+ final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+ // PUT a subscription
+ String body = gson.toJson(info);
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+ InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ assertThat(restoredSubscriptions.size()).isEqualTo(1);
+ assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1);
+
+ // Delete the subscription
+ restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+ restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ assertThat(restoredSubscriptions.size()).isZero();
+ }
+
@Test
void testConsumerTypeSubscription() throws Exception {
}
}
+ @Test
+ void testRemovingNonWorkingSubscription() throws Exception {
+ // Test that subscriptions are removed for a unresponsive consumer
+
+ // PUT a subscription with a junk callback
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+ String body = gson.toJson(info);
+ restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+ assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+ this.putInfoType(TYPE_ID);
+ // The callback will fail and the subscription will be removed
+ await().untilAsserted(() -> assertThat(this.infoTypeSubscriptions.size()).isZero());
+ }
+
@Test
void testTypeSubscriptionErrorCodes() throws Exception {
ProducerInfoTypeInfo producerEiTypeRegistrationInfo(String typeId)
throws JsonMappingException, JsonProcessingException {
- return new ProducerInfoTypeInfo(jsonSchemaObject());
+ return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
}
ProducerRegistrationInfo producerEiRegistratioInfoRejecting(String typeId)
}
}
+ private Object typeSpecifcInfoObject() {
+ return jsonObject("{ \"propertyName\" : \"value\" }");
+ }
+
private Object jsonSchemaObject() {
// a json schema with one mandatory property named "string"
String schemaStr = "{" //