NONRTRIC - Enrichment Coordinator Service, making type availability subscriptions...
[nonrtric.git] / enrichment-coordinator-service / src / test / java / org / oransc / enrichment / ApplicationTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.enrichment;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26
27 import com.fasterxml.jackson.core.JsonProcessingException;
28 import com.fasterxml.jackson.databind.JsonMappingException;
29 import com.google.gson.Gson;
30 import com.google.gson.GsonBuilder;
31 import com.google.gson.JsonParser;
32
33 import java.io.FileNotFoundException;
34 import java.io.FileOutputStream;
35 import java.io.PrintStream;
36 import java.lang.invoke.MethodHandles;
37 import java.util.Arrays;
38
39 import org.json.JSONObject;
40 import org.junit.jupiter.api.AfterEach;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.extension.ExtendWith;
44 import org.oransc.enrichment.clients.AsyncRestClient;
45 import org.oransc.enrichment.clients.AsyncRestClientFactory;
46 import org.oransc.enrichment.configuration.ApplicationConfig;
47 import org.oransc.enrichment.configuration.ImmutableHttpProxyConfig;
48 import org.oransc.enrichment.configuration.ImmutableWebClientConfig;
49 import org.oransc.enrichment.configuration.WebClientConfig;
50 import org.oransc.enrichment.configuration.WebClientConfig.HttpProxyConfig;
51 import org.oransc.enrichment.controller.ConsumerSimulatorController;
52 import org.oransc.enrichment.controller.ProducerSimulatorController;
53 import org.oransc.enrichment.controllers.a1e.A1eConsts;
54 import org.oransc.enrichment.controllers.a1e.A1eEiJobInfo;
55 import org.oransc.enrichment.controllers.a1e.A1eEiJobStatus;
56 import org.oransc.enrichment.controllers.a1e.A1eEiTypeInfo;
57 import org.oransc.enrichment.controllers.r1consumer.ConsumerConsts;
58 import org.oransc.enrichment.controllers.r1consumer.ConsumerInfoTypeInfo;
59 import org.oransc.enrichment.controllers.r1consumer.ConsumerJobInfo;
60 import org.oransc.enrichment.controllers.r1consumer.ConsumerJobStatus;
61 import org.oransc.enrichment.controllers.r1consumer.ConsumerTypeRegistrationInfo;
62 import org.oransc.enrichment.controllers.r1consumer.ConsumerTypeSubscriptionInfo;
63 import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
64 import org.oransc.enrichment.controllers.r1producer.ProducerConsts;
65 import org.oransc.enrichment.controllers.r1producer.ProducerInfoTypeInfo;
66 import org.oransc.enrichment.controllers.r1producer.ProducerJobInfo;
67 import org.oransc.enrichment.controllers.r1producer.ProducerRegistrationInfo;
68 import org.oransc.enrichment.controllers.r1producer.ProducerStatusInfo;
69 import org.oransc.enrichment.exceptions.ServiceException;
70 import org.oransc.enrichment.repository.InfoJob;
71 import org.oransc.enrichment.repository.InfoJobs;
72 import org.oransc.enrichment.repository.InfoProducer;
73 import org.oransc.enrichment.repository.InfoProducers;
74 import org.oransc.enrichment.repository.InfoType;
75 import org.oransc.enrichment.repository.InfoTypeSubscriptions;
76 import org.oransc.enrichment.repository.InfoTypes;
77 import org.oransc.enrichment.tasks.ProducerSupervision;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80 import org.springframework.beans.factory.annotation.Autowired;
81 import org.springframework.boot.test.context.SpringBootTest;
82 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
83 import org.springframework.boot.test.context.TestConfiguration;
84 import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
85 import org.springframework.boot.web.server.LocalServerPort;
86 import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
87 import org.springframework.context.ApplicationContext;
88 import org.springframework.context.annotation.Bean;
89 import org.springframework.http.HttpStatus;
90 import org.springframework.http.MediaType;
91 import org.springframework.http.ResponseEntity;
92 import org.springframework.test.context.TestPropertySource;
93 import org.springframework.test.context.junit.jupiter.SpringExtension;
94 import org.springframework.web.reactive.function.client.WebClientResponseException;
95
96 import reactor.core.publisher.Mono;
97 import reactor.test.StepVerifier;
98
99 @ExtendWith(SpringExtension.class)
100 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
101 @TestPropertySource(
102     properties = { //
103         "server.ssl.key-store=./config/keystore.jks", //
104         "app.webclient.trust-store=./config/truststore.jks", //
105         "app.vardata-directory=./target"})
106 class ApplicationTest {
107     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
108
109     private final String TYPE_ID = "typeId";
110     private final String PRODUCER_ID = "producerId";
111     private final String EI_JOB_PROPERTY = "\"property1\"";
112     private final String EI_JOB_ID = "jobId";
113
114     @Autowired
115     ApplicationContext context;
116
117     @Autowired
118     InfoJobs infoJobs;
119
120     @Autowired
121     InfoTypes infoTypes;
122
123     @Autowired
124     InfoProducers infoProducers;
125
126     @Autowired
127     ApplicationConfig applicationConfig;
128
129     @Autowired
130     ProducerSimulatorController producerSimulator;
131
132     @Autowired
133     ConsumerSimulatorController consumerSimulator;
134
135     @Autowired
136     ProducerSupervision producerSupervision;
137
138     @Autowired
139     ProducerCallbacks producerCallbacks;
140
141     @Autowired
142     InfoTypeSubscriptions infoTypeSubscriptions;
143
144     private static Gson gson = new GsonBuilder().create();
145
146     /**
147      * Overrides the BeanFactory.
148      */
149     @TestConfiguration
150     static class TestBeanFactory {
151         @Bean
152         public ServletWebServerFactory servletContainer() {
153             return new TomcatServletWebServerFactory();
154         }
155     }
156
157     @LocalServerPort
158     private int port;
159
160     @BeforeEach
161     void reset() {
162         this.infoJobs.clear();
163         this.infoTypes.clear();
164         this.infoProducers.clear();
165         this.infoTypeSubscriptions.clear();
166         this.producerSimulator.getTestResults().reset();
167         this.consumerSimulator.getTestResults().reset();
168     }
169
170     @AfterEach
171     void check() {
172         assertThat(this.producerSimulator.getTestResults().errorFound).isFalse();
173     }
174
175     @Test
176     void generateApiDoc() throws FileNotFoundException {
177         String url = "/v3/api-docs";
178         ResponseEntity<String> resp = restClient().getForEntity(url).block();
179         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
180
181         JSONObject jsonObj = new JSONObject(resp.getBody());
182         assertThat(jsonObj.remove("servers")).isNotNull();
183
184         String indented = jsonObj.toString(4);
185         try (PrintStream out = new PrintStream(new FileOutputStream("api/ecs-api.json"))) {
186             out.print(indented);
187         }
188     }
189
190     @Test
191     void a1eGetEiTypes() throws Exception {
192         putInfoProducerWithOneType(PRODUCER_ID, "test");
193         String url = A1eConsts.API_ROOT + "/eitypes";
194         String rsp = restClient().get(url).block();
195         assertThat(rsp).isEqualTo("[\"test\"]");
196     }
197
198     @Test
199     void consumerGetInfoTypes() throws Exception {
200         putInfoProducerWithOneType(PRODUCER_ID, "test");
201         String url = ConsumerConsts.API_ROOT + "/info-types";
202         String rsp = restClient().get(url).block();
203         assertThat(rsp).isEqualTo("[\"test\"]");
204     }
205
206     @Test
207     void a1eGetEiTypesEmpty() throws Exception {
208         String url = A1eConsts.API_ROOT + "/eitypes";
209         String rsp = restClient().get(url).block();
210         assertThat(rsp).isEqualTo("[]");
211     }
212
213     @Test
214     void consumerGetEiTypesEmpty() throws Exception {
215         String url = ConsumerConsts.API_ROOT + "/info-types";
216         String rsp = restClient().get(url).block();
217         assertThat(rsp).isEqualTo("[]");
218     }
219
220     @Test
221     void a1eGetEiType() throws Exception {
222         putInfoProducerWithOneType(PRODUCER_ID, "test");
223         String url = A1eConsts.API_ROOT + "/eitypes/test";
224         String rsp = restClient().get(url).block();
225         A1eEiTypeInfo info = gson.fromJson(rsp, A1eEiTypeInfo.class);
226         assertThat(info).isNotNull();
227     }
228
229     @Test
230     void consumerGetEiType() throws Exception {
231         putInfoProducerWithOneType(PRODUCER_ID, "test");
232         String url = ConsumerConsts.API_ROOT + "/info-types/test";
233         String rsp = restClient().get(url).block();
234         ConsumerInfoTypeInfo info = gson.fromJson(rsp, ConsumerInfoTypeInfo.class);
235         assertThat(info).isNotNull();
236         assertThat(info.jobDataSchema).isNotNull();
237         assertThat(info.state).isEqualTo(ConsumerInfoTypeInfo.ConsumerTypeStatusValues.ENABLED);
238         assertThat(info.noOfProducers).isEqualTo(1);
239     }
240
241     @Test
242     void a1eGetEiTypeNotFound() throws Exception {
243         String url = A1eConsts.API_ROOT + "/eitypes/junk";
244         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND, "Information type not found: junk");
245     }
246
247     @Test
248     void consumerGetEiTypeNotFound() throws Exception {
249         String url = ConsumerConsts.API_ROOT + "/info-types/junk";
250         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND, "Information type not found: junk");
251     }
252
253     @Test
254     void a1eGetEiJobsIds() throws Exception {
255         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
256         putEiJob(TYPE_ID, "jobId");
257         final String JOB_ID_JSON = "[\"jobId\"]";
258         String url = A1eConsts.API_ROOT + "/eijobs?infoTypeId=typeId";
259         String rsp = restClient().get(url).block();
260         assertThat(rsp).isEqualTo(JOB_ID_JSON);
261
262         url = A1eConsts.API_ROOT + "/eijobs?owner=owner";
263         rsp = restClient().get(url).block();
264         assertThat(rsp).isEqualTo(JOB_ID_JSON);
265
266         url = A1eConsts.API_ROOT + "/eijobs?owner=JUNK";
267         rsp = restClient().get(url).block();
268         assertThat(rsp).isEqualTo("[]");
269
270         url = A1eConsts.API_ROOT + "/eijobs";
271         rsp = restClient().get(url).block();
272         assertThat(rsp).isEqualTo(JOB_ID_JSON);
273
274         url = A1eConsts.API_ROOT + "/eijobs?eiTypeId=typeId&&owner=owner";
275         rsp = restClient().get(url).block();
276         assertThat(rsp).isEqualTo(JOB_ID_JSON);
277
278         url = A1eConsts.API_ROOT + "/eijobs?eiTypeId=JUNK";
279         rsp = restClient().get(url).block();
280         assertThat(rsp).isEqualTo("[]");
281     }
282
283     @Test
284     void consumerGetInformationJobsIds() throws Exception {
285         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
286         putEiJob(TYPE_ID, "jobId");
287         final String JOB_ID_JSON = "[\"jobId\"]";
288         String url = ConsumerConsts.API_ROOT + "/info-jobs?infoTypeId=typeId";
289         String rsp = restClient().get(url).block();
290         assertThat(rsp).isEqualTo(JOB_ID_JSON);
291
292         url = ConsumerConsts.API_ROOT + "/info-jobs?owner=owner";
293         rsp = restClient().get(url).block();
294         assertThat(rsp).isEqualTo(JOB_ID_JSON);
295
296         url = ConsumerConsts.API_ROOT + "/info-jobs?owner=JUNK";
297         rsp = restClient().get(url).block();
298         assertThat(rsp).isEqualTo("[]");
299
300         url = ConsumerConsts.API_ROOT + "/info-jobs";
301         rsp = restClient().get(url).block();
302         assertThat(rsp).isEqualTo(JOB_ID_JSON);
303
304         url = ConsumerConsts.API_ROOT + "/info-jobs?infoTypeId=typeId&&owner=owner";
305         rsp = restClient().get(url).block();
306         assertThat(rsp).isEqualTo(JOB_ID_JSON);
307
308         url = ConsumerConsts.API_ROOT + "/info-jobs?infoTypeId=JUNK";
309         rsp = restClient().get(url).block();
310         assertThat(rsp).isEqualTo("[]");
311     }
312
313     @Test
314     void a1eGetEiJob() throws Exception {
315         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
316         putEiJob(TYPE_ID, "jobId");
317         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
318         String rsp = restClient().get(url).block();
319         A1eEiJobInfo info = gson.fromJson(rsp, A1eEiJobInfo.class);
320         assertThat(info.owner).isEqualTo("owner");
321         assertThat(info.eiTypeId).isEqualTo(TYPE_ID);
322     }
323
324     @Test
325     void consumerGetEiJob() throws Exception {
326         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
327         putEiJob(TYPE_ID, "jobId");
328         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
329         String rsp = restClient().get(url).block();
330         ConsumerJobInfo info = gson.fromJson(rsp, ConsumerJobInfo.class);
331         assertThat(info.owner).isEqualTo("owner");
332         assertThat(info.infoTypeId).isEqualTo(TYPE_ID);
333     }
334
335     @Test
336     void a1eGetEiJobNotFound() throws Exception {
337         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
338         String url = A1eConsts.API_ROOT + "/eijobs/junk";
339         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND, "Could not find Information job: junk");
340     }
341
342     @Test
343     void consumerGetInfoJobNotFound() throws Exception {
344         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
345         String url = ConsumerConsts.API_ROOT + "/info-jobs/junk";
346         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND, "Could not find Information job: junk");
347     }
348
349     @Test
350     void a1eGetEiJobStatus() throws Exception {
351         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
352         putEiJob(TYPE_ID, "jobId");
353
354         verifyJobStatus("jobId", "ENABLED");
355     }
356
357     @Test
358     void consumerGetEiJobStatus() throws Exception {
359         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
360         putEiJob(TYPE_ID, "jobId");
361
362         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId/status";
363         String rsp = restClient().get(url).block();
364         assertThat(rsp) //
365             .contains("ENABLED") //
366             .contains(PRODUCER_ID);
367
368         ConsumerJobStatus status = gson.fromJson(rsp, ConsumerJobStatus.class);
369         assertThat(status.producers).contains(PRODUCER_ID);
370     }
371
372     @Test
373     void a1eDeleteEiJob() throws Exception {
374         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
375         putEiJob(TYPE_ID, "jobId");
376         assertThat(this.infoJobs.size()).isEqualTo(1);
377         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
378         restClient().delete(url).block();
379         assertThat(this.infoJobs.size()).isZero();
380
381         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
382         await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(1));
383         assertThat(simulatorResults.jobsStopped.get(0)).isEqualTo("jobId");
384     }
385
386     @Test
387     void consumerDeleteEiJob() throws Exception {
388         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
389         putEiJob(TYPE_ID, "jobId");
390         assertThat(this.infoJobs.size()).isEqualTo(1);
391         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
392         restClient().delete(url).block();
393         assertThat(this.infoJobs.size()).isZero();
394
395         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
396         await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(1));
397         assertThat(simulatorResults.jobsStopped.get(0)).isEqualTo("jobId");
398     }
399
400     @Test
401     void a1eDeleteEiJobNotFound() throws Exception {
402         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
403         String url = A1eConsts.API_ROOT + "/eijobs/junk";
404         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND, "Could not find Information job: junk");
405     }
406
407     @Test
408     void consumerDeleteEiJobNotFound() throws Exception {
409         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
410         String url = ConsumerConsts.API_ROOT + "/info-jobs/junk";
411         testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND, "Could not find Information job: junk");
412     }
413
414     @Test
415     void a1ePutEiJob() throws Exception {
416         // Test that one producer accepting a job is enough
417         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
418         putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
419
420         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
421         String body = gson.toJson(infoJobInfo());
422         ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
423         assertThat(this.infoJobs.size()).isEqualTo(1);
424         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
425
426         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
427         await().untilAsserted(() -> assertThat(simulatorResults.jobsStarted.size()).isEqualTo(1));
428         ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
429         assertThat(request.id).isEqualTo("jobId");
430
431         // One retry --> two calls
432         await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
433         assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
434
435         resp = restClient().putForEntity(url, body).block();
436         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
437         InfoJob job = this.infoJobs.getJob("jobId");
438         assertThat(job.getOwner()).isEqualTo("owner");
439
440         verifyJobStatus(EI_JOB_ID, "ENABLED");
441     }
442
443     @Test
444     void consumerPutInformationJob() throws Exception {
445         // Test that one producer accepting a job is enough
446         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
447
448         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
449         String body = gson.toJson(consumerJobInfo());
450         ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
451         assertThat(this.infoJobs.size()).isEqualTo(1);
452         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
453
454         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
455         await().untilAsserted(() -> assertThat(simulatorResults.jobsStarted.size()).isEqualTo(1));
456         ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
457         assertThat(request.id).isEqualTo("jobId");
458
459         resp = restClient().putForEntity(url, body).block();
460         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
461         InfoJob job = this.infoJobs.getJob("jobId");
462         assertThat(job.getOwner()).isEqualTo("owner");
463
464         verifyJobStatus(EI_JOB_ID, "ENABLED");
465     }
466
467     @Test
468     void consumerPutInformationJob_noType() throws JsonMappingException, JsonProcessingException, ServiceException {
469         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId?typeCheck=false";
470         String body = gson.toJson(consumerJobInfo());
471         ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
472         assertThat(this.infoJobs.size()).isEqualTo(1);
473         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
474         verifyJobStatus(EI_JOB_ID, "DISABLED");
475
476         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
477
478         verifyJobStatus(EI_JOB_ID, "ENABLED");
479     }
480
481     @Test
482     void a1ePutEiJob_jsonSchemavalidationError() throws Exception {
483         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
484
485         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
486         // The element with name "property1" is mandatory in the schema
487         A1eEiJobInfo jobInfo = new A1eEiJobInfo("typeId", jsonObject("{ \"XXstring\" : \"value\" }"), "owner",
488             "targetUri", "jobStatusUrl");
489         String body = gson.toJson(jobInfo);
490
491         testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
492     }
493
494     @Test
495     void consumerPutJob_jsonSchemavalidationError() throws Exception {
496         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
497
498         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId?typeCheck=true";
499         // The element with name "property1" is mandatory in the schema
500         ConsumerJobInfo jobInfo =
501             new ConsumerJobInfo("typeId", jsonObject("{ \"XXstring\" : \"value\" }"), "owner", "targetUri", null);
502         String body = gson.toJson(jobInfo);
503
504         testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
505     }
506
507     @Test
508     void consumerPutJob_uriError() throws Exception {
509         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
510
511         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId?typeCheck=true";
512
513         ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(), "owner", "junk", null);
514         String body = gson.toJson(jobInfo);
515
516         testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "URI: junk is not absolute");
517     }
518
519     @Test
520     void a1eChangingEiTypeGetRejected() throws Exception {
521         putInfoProducerWithOneType("producer1", "typeId1");
522         putInfoProducerWithOneType("producer2", "typeId2");
523         putEiJob("typeId1", "jobId");
524
525         String url = A1eConsts.API_ROOT + "/eijobs/jobId";
526         String body = gson.toJson(infoJobInfo("typeId2", "jobId"));
527         testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT,
528             "Not allowed to change type for existing EI job");
529     }
530
531     @Test
532     void consumerChangingInfoTypeGetRejected() throws Exception {
533         putInfoProducerWithOneType("producer1", "typeId1");
534         putInfoProducerWithOneType("producer2", "typeId2");
535         putEiJob("typeId1", "jobId");
536
537         String url = ConsumerConsts.API_ROOT + "/info-jobs/jobId";
538         String body = gson.toJson(consumerJobInfo("typeId2", "jobId"));
539         testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Not allowed to change type for existing job");
540     }
541
542     @Test
543     void producerPutEiType() throws JsonMappingException, JsonProcessingException, ServiceException {
544         assertThat(putInfoType(TYPE_ID)).isEqualTo(HttpStatus.CREATED);
545         assertThat(putInfoType(TYPE_ID)).isEqualTo(HttpStatus.OK);
546     }
547
548     @Test
549     void producerPutEiType_noSchema() {
550         String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
551         String body = "{}";
552         testErrorCode(restClient().put(url, body), HttpStatus.BAD_REQUEST, "No schema provided");
553     }
554
555     @Test
556     void producerDeleteEiType() throws Exception {
557         putInfoType(TYPE_ID);
558         this.putEiJob(TYPE_ID, "job1");
559         this.putEiJob(TYPE_ID, "job2");
560         deleteInfoType(TYPE_ID);
561
562         assertThat(this.infoTypes.size()).isZero();
563         assertThat(this.infoJobs.size()).isZero(); // Test that also the job is deleted
564
565         testErrorCode(restClient().delete(deleteInfoTypeUrl(TYPE_ID)), HttpStatus.NOT_FOUND,
566             "Information type not found");
567     }
568
569     @Test
570     void producerDeleteEiTypeExistingProducer() throws Exception {
571         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
572         String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
573         testErrorCode(restClient().delete(url), HttpStatus.NOT_ACCEPTABLE,
574             "The type has active producers: " + PRODUCER_ID);
575         assertThat(this.infoTypes.size()).isEqualTo(1);
576     }
577
578     @Test
579     void producerPutProducerWithOneType_rejecting()
580         throws JsonMappingException, JsonProcessingException, ServiceException {
581         putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
582         String url = A1eConsts.API_ROOT + "/eijobs/" + EI_JOB_ID;
583         String body = gson.toJson(infoJobInfo());
584         restClient().put(url, body).block();
585
586         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
587         // There is one retry -> 2 calls
588         await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
589         assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
590
591         verifyJobStatus(EI_JOB_ID, "DISABLED");
592     }
593
594     @Test
595     void producerGetEiProducerTypes() throws Exception {
596         final String EI_TYPE_ID_2 = TYPE_ID + "_2";
597         putInfoProducerWithOneType("producer1", TYPE_ID);
598         putEiJob(TYPE_ID, "jobId");
599         putInfoProducerWithOneType("producer2", EI_TYPE_ID_2);
600         putEiJob(EI_TYPE_ID_2, "jobId2");
601         String url = ProducerConsts.API_ROOT + "/info-types";
602
603         ResponseEntity<String> resp = restClient().getForEntity(url).block();
604         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
605         assertThat(resp.getBody()).contains(TYPE_ID);
606         assertThat(resp.getBody()).contains(EI_TYPE_ID_2);
607     }
608
609     @Test
610     void producerPutEiProducer() throws Exception {
611         this.putInfoType(TYPE_ID);
612         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
613         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
614
615         ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
616         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
617
618         assertThat(this.infoTypes.size()).isEqualTo(1);
619         assertThat(this.infoProducers.getProducersForType(TYPE_ID)).hasSize(1);
620         assertThat(this.infoProducers.size()).isEqualTo(1);
621         assertThat(this.infoProducers.get("infoProducerId").getInfoTypes().iterator().next().getId())
622             .isEqualTo(TYPE_ID);
623
624         resp = restClient().putForEntity(url, body).block();
625         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
626
627         resp = restClient().getForEntity(url).block();
628         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
629         assertThat(resp.getBody()).isEqualTo(body);
630     }
631
632     @Test
633     void producerPutEiProducerExistingJob() throws Exception {
634         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
635         putEiJob(TYPE_ID, "jobId");
636         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
637         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
638         restClient().putForEntity(url, body).block();
639
640         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
641         await().untilAsserted(() -> assertThat(simulatorResults.jobsStarted.size()).isEqualTo(2));
642         ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
643         assertThat(request.id).isEqualTo("jobId");
644     }
645
646     @Test
647     void testPutEiProducer_noType() throws Exception {
648         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
649         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
650         testErrorCode(restClient().put(url, body), HttpStatus.NOT_FOUND, "Information type not found");
651     }
652
653     @Test
654     void producerPutProducerAndEiJob() throws Exception {
655         this.putInfoType(TYPE_ID);
656         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
657         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
658         restClient().putForEntity(url, body).block();
659         assertThat(this.infoTypes.size()).isEqualTo(1);
660         this.infoTypes.getType(TYPE_ID);
661
662         url = A1eConsts.API_ROOT + "/eijobs/jobId";
663         body = gson.toJson(infoJobInfo());
664         restClient().putForEntity(url, body).block();
665
666         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
667         await().untilAsserted(() -> assertThat(simulatorResults.jobsStarted.size()).isEqualTo(1));
668         ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
669         assertThat(request.id).isEqualTo("jobId");
670     }
671
672     @Test
673     void producerGetEiJobsForProducer() throws JsonMappingException, JsonProcessingException, ServiceException {
674         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
675         putEiJob(TYPE_ID, "jobId1");
676         putEiJob(TYPE_ID, "jobId2");
677
678         // PUT a consumerRestApiTestBase.java
679         String url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId";
680         String body = gson.toJson(producerInfoRegistratioInfo(TYPE_ID));
681         restClient().putForEntity(url, body).block();
682
683         url = ProducerConsts.API_ROOT + "/info-producers/infoProducerId/info-jobs";
684         ResponseEntity<String> resp = restClient().getForEntity(url).block();
685         assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
686
687         ProducerJobInfo[] parsedResp = gson.fromJson(resp.getBody(), ProducerJobInfo[].class);
688         assertThat(parsedResp[0].typeId).isEqualTo(TYPE_ID);
689         assertThat(parsedResp[1].typeId).isEqualTo(TYPE_ID);
690     }
691
692     @Test
693     void producerDeleteEiProducer() throws Exception {
694         putInfoProducerWithOneType("infoProducerId", TYPE_ID);
695         putInfoProducerWithOneType("infoProducerId2", TYPE_ID);
696
697         assertThat(this.infoProducers.size()).isEqualTo(2);
698         InfoType type = this.infoTypes.getType(TYPE_ID);
699         assertThat(this.infoProducers.getProducerIdsForType(type.getId())).contains("infoProducerId");
700         assertThat(this.infoProducers.getProducerIdsForType(type.getId())).contains("infoProducerId2");
701         putEiJob(TYPE_ID, "jobId");
702         assertThat(this.infoJobs.size()).isEqualTo(1);
703
704         deleteEiProducer("infoProducerId");
705         assertThat(this.infoProducers.size()).isEqualTo(1);
706         assertThat(this.infoProducers.getProducerIdsForType(TYPE_ID)).doesNotContain("infoProducerId");
707         verifyJobStatus("jobId", "ENABLED");
708
709         deleteEiProducer("infoProducerId2");
710         assertThat(this.infoProducers.size()).isZero();
711         assertThat(this.infoTypes.size()).isEqualTo(1);
712         verifyJobStatus("jobId", "DISABLED");
713     }
714
715     @Test
716     void a1eJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException {
717         ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
718         ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults();
719
720         putInfoProducerWithOneType("infoProducerId", TYPE_ID);
721         putEiJob(TYPE_ID, "jobId");
722         putInfoProducerWithOneType("infoProducerId2", TYPE_ID);
723         await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2));
724
725         deleteEiProducer("infoProducerId2");
726         assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains, one producer left
727         deleteEiProducer("infoProducerId");
728         assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains
729         assertThat(this.infoJobs.size()).isEqualTo(1); // The job remains
730         await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(1));
731         assertThat(consumerCalls.eiJobStatusCallbacks.get(0).state)
732             .isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
733
734         putInfoProducerWithOneType("infoProducerId", TYPE_ID);
735         await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(2));
736         assertThat(consumerCalls.eiJobStatusCallbacks.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
737     }
738
739     @Test
740     void a1eJobStatusNotifications2() throws JsonMappingException, JsonProcessingException, ServiceException {
741         // Test replacing a producer with new and removed types
742
743         // Create a job
744         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
745         putEiJob(TYPE_ID, EI_JOB_ID);
746
747         // change the type for the producer, the job shall be disabled
748         putInfoProducerWithOneType(PRODUCER_ID, "junk");
749         verifyJobStatus(EI_JOB_ID, "DISABLED");
750         ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
751         await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(1));
752         assertThat(consumerCalls.eiJobStatusCallbacks.get(0).state)
753             .isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
754
755         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
756         verifyJobStatus(EI_JOB_ID, "ENABLED");
757         await().untilAsserted(() -> assertThat(consumerCalls.eiJobStatusCallbacks.size()).isEqualTo(2));
758         assertThat(consumerCalls.eiJobStatusCallbacks.get(1).state).isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
759     }
760
761     @Test
762     void producerGetProducerEiType() throws JsonMappingException, JsonProcessingException, ServiceException {
763         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
764         String url = ProducerConsts.API_ROOT + "/info-types/" + TYPE_ID;
765         ResponseEntity<String> resp = restClient().getForEntity(url).block();
766         ProducerInfoTypeInfo info = gson.fromJson(resp.getBody(), ProducerInfoTypeInfo.class);
767         assertThat(info.jobDataSchema).isNotNull();
768     }
769
770     @Test
771     void producerGetProducerIdentifiers() throws JsonMappingException, JsonProcessingException, ServiceException {
772         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
773         String url = ProducerConsts.API_ROOT + "/info-producers";
774         ResponseEntity<String> resp = restClient().getForEntity(url).block();
775         assertThat(resp.getBody()).contains(PRODUCER_ID);
776
777         url = ProducerConsts.API_ROOT + "/info-producers?info_type_id=" + TYPE_ID;
778         resp = restClient().getForEntity(url).block();
779         assertThat(resp.getBody()).contains(PRODUCER_ID);
780
781         url = ProducerConsts.API_ROOT + "/info-producers?info_type_id=junk";
782         resp = restClient().getForEntity(url).block();
783         assertThat(resp.getBody()).isEqualTo("[]");
784     }
785
786     @Test
787     void producerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
788
789         ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
790         putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
791
792         {
793             // Create a job
794             putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
795             putEiJob(TYPE_ID, EI_JOB_ID);
796             verifyJobStatus(EI_JOB_ID, "ENABLED");
797             deleteEiProducer(PRODUCER_ID);
798             // A Job disabled status notification shall now be received
799             await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(1));
800             assertThat(consumerResults.eiJobStatusCallbacks.get(0).state)
801                 .isEqualTo(A1eEiJobStatus.EiJobStatusValues.DISABLED);
802             verifyJobStatus(EI_JOB_ID, "DISABLED");
803         }
804
805         assertThat(this.infoProducers.size()).isEqualTo(1);
806         assertThat(this.infoTypes.size()).isEqualTo(1);
807         assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
808
809         this.producerSupervision.createTask().blockLast();
810         this.producerSupervision.createTask().blockLast();
811
812         // Now we have one producer that is disabled
813         assertThat(this.infoProducers.size()).isEqualTo(1);
814         assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
815
816         // After 3 failed checks, the producer shall be deregistered
817         this.producerSupervision.createTask().blockLast();
818         assertThat(this.infoProducers.size()).isZero(); // The producer is removed
819         assertThat(this.infoTypes.size()).isEqualTo(1); // The type remains
820
821         // Now we have one disabled job, and no producer.
822         // PUT a producer, then a Job ENABLED status notification shall be received
823         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
824         await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(2));
825         assertThat(consumerResults.eiJobStatusCallbacks.get(1).state)
826             .isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
827         verifyJobStatus(EI_JOB_ID, "ENABLED");
828     }
829
830     @Test
831     void producerSupervision2() throws JsonMappingException, JsonProcessingException, ServiceException {
832         // Test that supervision enables not enabled jobs and sends a notification when
833         // suceeded
834
835         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
836         putEiJob(TYPE_ID, EI_JOB_ID);
837
838         InfoProducer producer = this.infoProducers.getProducer(PRODUCER_ID);
839         InfoJob job = this.infoJobs.getJob(EI_JOB_ID);
840         // Pretend that the producer did reject the job and the a DISABLED notification
841         // is sent for the job
842         producer.setJobDisabled(job);
843         job.setLastReportedStatus(false);
844         verifyJobStatus(EI_JOB_ID, "DISABLED");
845
846         // Run the supervision and wait for the job to get started in the producer
847         this.producerSupervision.createTask().blockLast();
848         ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
849         await().untilAsserted(() -> assertThat(consumerResults.eiJobStatusCallbacks.size()).isEqualTo(1));
850         assertThat(consumerResults.eiJobStatusCallbacks.get(0).state)
851             .isEqualTo(A1eEiJobStatus.EiJobStatusValues.ENABLED);
852         verifyJobStatus(EI_JOB_ID, "ENABLED");
853     }
854
855     @Test
856     void testGetStatus() throws JsonMappingException, JsonProcessingException, ServiceException {
857         putEiProducerWithOneTypeRejecting("simulateProducerError", TYPE_ID);
858         putEiProducerWithOneTypeRejecting("simulateProducerError2", TYPE_ID);
859
860         String url = "/status";
861         ResponseEntity<String> resp = restClient().getForEntity(url).block();
862         assertThat(resp.getBody()).contains("hunky dory");
863     }
864
865     @Test
866     void testEiJobDatabase() throws Exception {
867         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
868         putEiJob(TYPE_ID, "jobId1");
869         putEiJob(TYPE_ID, "jobId2");
870
871         assertThat(this.infoJobs.size()).isEqualTo(2);
872
873         {
874             InfoJob savedJob = this.infoJobs.getJob("jobId1");
875             // Restore the jobs
876             InfoJobs jobs = new InfoJobs(this.applicationConfig, this.producerCallbacks);
877             jobs.restoreJobsFromDatabase();
878             assertThat(jobs.size()).isEqualTo(2);
879             InfoJob restoredJob = jobs.getJob("jobId1");
880             assertThat(restoredJob.getId()).isEqualTo("jobId1");
881             assertThat(restoredJob.getLastUpdated()).isEqualTo(savedJob.getLastUpdated());
882
883             jobs.remove("jobId1", this.infoProducers);
884             jobs.remove("jobId2", this.infoProducers);
885         }
886         {
887             // Restore the jobs, no jobs in database
888             InfoJobs jobs = new InfoJobs(this.applicationConfig, this.producerCallbacks);
889             jobs.restoreJobsFromDatabase();
890             assertThat(jobs.size()).isZero();
891         }
892         logger.warn("Test removing a job when the db file is gone");
893         this.infoJobs.remove("jobId1", this.infoProducers);
894         assertThat(this.infoJobs.size()).isEqualTo(1);
895
896         ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
897         await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(3));
898     }
899
900     @Test
901     void testEiTypesDatabase() throws Exception {
902         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
903
904         assertThat(this.infoTypes.size()).isEqualTo(1);
905
906         {
907             // Restore the types
908             InfoTypes types = new InfoTypes(this.applicationConfig);
909             types.restoreTypesFromDatabase();
910             assertThat(types.size()).isEqualTo(1);
911         }
912         {
913             // Restore the jobs, no jobs in database
914             InfoTypes types = new InfoTypes(this.applicationConfig);
915             types.clear();
916             types.restoreTypesFromDatabase();
917             assertThat(types.size()).isZero();
918         }
919         logger.warn("Test removing a job when the db file is gone");
920         this.infoTypes.remove(this.infoTypes.getType(TYPE_ID));
921         assertThat(this.infoJobs.size()).isZero();
922     }
923
924     @Test
925     void testConsumerTypeSubscriptionDatabase() {
926         final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
927         final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
928         // PUT a subscription
929         String body = gson.toJson(info);
930         restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
931         assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
932
933         InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
934         assertThat(restoredSubscriptions.size()).isEqualTo(1);
935
936         // Delete the subscription
937         restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
938         restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
939         assertThat(restoredSubscriptions.size()).isZero();
940     }
941
942     @Test
943     void testConsumerTypeSubscription() throws Exception {
944
945         final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
946         final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
947
948         {
949             // PUT a subscription
950             String body = gson.toJson(info);
951             ResponseEntity<String> resp =
952                 restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
953             assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
954             assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.CREATED);
955             resp = restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
956             assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
957         }
958         {
959             // GET IDs
960             ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl()).block();
961             assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
962             resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=owner").block();
963             assertThat(resp.getBody()).isEqualTo("[\"subscriptionId\"]");
964             resp = restClient().getForEntity(typeSubscriptionUrl() + "?owner=junk").block();
965             assertThat(resp.getBody()).isEqualTo("[]");
966         }
967
968         {
969             // GET the individual subscription
970             ResponseEntity<String> resp = restClient().getForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
971             ConsumerTypeSubscriptionInfo respInfo = gson.fromJson(resp.getBody(), ConsumerTypeSubscriptionInfo.class);
972             assertThat(respInfo).isEqualTo(info);
973         }
974
975         {
976             // Test the callbacks
977             final ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
978
979             // Test callback for PUT type
980             this.putInfoType(TYPE_ID);
981             await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(1));
982             assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(0).state)
983                 .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED);
984
985             // Test callback for DELETE type
986             this.deleteInfoType(TYPE_ID);
987             await().untilAsserted(() -> assertThat(consumerCalls.typeRegistrationInfoCallbacks.size()).isEqualTo(2));
988             assertThat(consumerCalls.typeRegistrationInfoCallbacks.get(1).state)
989                 .isEqualTo(ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED);
990         }
991
992         {
993             // DELETE the subscription
994             ResponseEntity<String> resp =
995                 restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
996             assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);
997             assertThat(this.infoTypeSubscriptions.size()).isZero();
998             resp = restClient().getForEntity(typeSubscriptionUrl()).block();
999             assertThat(resp.getBody()).isEqualTo("[]");
1000         }
1001     }
1002
1003     @Test
1004     void testRemovingNonWorkingSubscription() throws Exception {
1005         // Test that subscriptions are removed for a unresponsive consumer
1006
1007         // PUT a subscription with a junk callback
1008         final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
1009         String body = gson.toJson(info);
1010         restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
1011         assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
1012
1013         this.putInfoType(TYPE_ID);
1014         // The callback will fail and the subscription will be removed
1015         await().untilAsserted(() -> assertThat(this.infoTypeSubscriptions.size()).isZero());
1016     }
1017
1018     @Test
1019     void testTypeSubscriptionErrorCodes() throws Exception {
1020
1021         testErrorCode(restClient().get(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
1022             "Could not find Information subscription: junk");
1023
1024         testErrorCode(restClient().delete(typeSubscriptionUrl() + "/junk"), HttpStatus.NOT_FOUND,
1025             "Could not find Information subscription: junk");
1026     }
1027
1028     private String typeSubscriptionUrl() {
1029         return ConsumerConsts.API_ROOT + "/info-type-subscription";
1030     }
1031
1032     private void deleteEiProducer(String infoProducerId) {
1033         String url = ProducerConsts.API_ROOT + "/info-producers/" + infoProducerId;
1034         restClient().deleteForEntity(url).block();
1035     }
1036
1037     private void verifyJobStatus(String jobId, String expStatus) {
1038         String url = A1eConsts.API_ROOT + "/eijobs/" + jobId + "/status";
1039         String rsp = restClient().get(url).block();
1040         assertThat(rsp).contains(expStatus);
1041     }
1042
1043     private void assertProducerOpState(String producerId,
1044         ProducerStatusInfo.OperationalState expectedOperationalState) {
1045         String statusUrl = ProducerConsts.API_ROOT + "/info-producers/" + producerId + "/status";
1046         ResponseEntity<String> resp = restClient().getForEntity(statusUrl).block();
1047         ProducerStatusInfo statusInfo = gson.fromJson(resp.getBody(), ProducerStatusInfo.class);
1048         assertThat(statusInfo.opState).isEqualTo(expectedOperationalState);
1049     }
1050
1051     ProducerInfoTypeInfo producerEiTypeRegistrationInfo(String typeId)
1052         throws JsonMappingException, JsonProcessingException {
1053         return new ProducerInfoTypeInfo(jsonSchemaObject());
1054     }
1055
1056     ProducerRegistrationInfo producerEiRegistratioInfoRejecting(String typeId)
1057         throws JsonMappingException, JsonProcessingException {
1058         return new ProducerRegistrationInfo(Arrays.asList(typeId), //
1059             baseUrl() + ProducerSimulatorController.JOB_ERROR_URL,
1060             baseUrl() + ProducerSimulatorController.SUPERVISION_ERROR_URL);
1061     }
1062
1063     ProducerRegistrationInfo producerInfoRegistratioInfo(String typeId)
1064         throws JsonMappingException, JsonProcessingException {
1065         return new ProducerRegistrationInfo(Arrays.asList(typeId), //
1066             baseUrl() + ProducerSimulatorController.JOB_URL, baseUrl() + ProducerSimulatorController.SUPERVISION_URL);
1067     }
1068
1069     private ConsumerJobInfo consumerJobInfo() throws JsonMappingException, JsonProcessingException {
1070         return consumerJobInfo(TYPE_ID, EI_JOB_ID);
1071     }
1072
1073     ConsumerJobInfo consumerJobInfo(String typeId, String infoJobId)
1074         throws JsonMappingException, JsonProcessingException {
1075         return new ConsumerJobInfo(typeId, jsonObject(), "owner", "https://junk.com",
1076             baseUrl() + ConsumerSimulatorController.getJobStatusUrl(infoJobId));
1077     }
1078
1079     private A1eEiJobInfo infoJobInfo() throws JsonMappingException, JsonProcessingException {
1080         return infoJobInfo(TYPE_ID, EI_JOB_ID);
1081     }
1082
1083     A1eEiJobInfo infoJobInfo(String typeId, String infoJobId) throws JsonMappingException, JsonProcessingException {
1084         return new A1eEiJobInfo(typeId, jsonObject(), "owner", "https://junk.com",
1085             baseUrl() + ConsumerSimulatorController.getJobStatusUrl(infoJobId));
1086     }
1087
1088     private Object jsonObject(String json) {
1089         try {
1090             return JsonParser.parseString(json).getAsJsonObject();
1091         } catch (Exception e) {
1092             throw new NullPointerException(e.toString());
1093         }
1094     }
1095
1096     private Object jsonSchemaObject() {
1097         // a json schema with one mandatory property named "string"
1098         String schemaStr = "{" //
1099             + "\"$schema\": \"http://json-schema.org/draft-04/schema#\"," //
1100             + "\"type\": \"object\"," //
1101             + "\"properties\": {" //
1102             + EI_JOB_PROPERTY + " : {" //
1103             + "    \"type\": \"string\"" //
1104             + "  }" //
1105             + "}," //
1106             + "\"required\": [" //
1107             + EI_JOB_PROPERTY //
1108             + "]" //
1109             + "}"; //
1110         return jsonObject(schemaStr);
1111     }
1112
1113     private Object jsonObject() {
1114         return jsonObject("{ " + EI_JOB_PROPERTY + " : \"value\" }");
1115     }
1116
1117     private InfoJob putEiJob(String infoTypeId, String jobId)
1118         throws JsonMappingException, JsonProcessingException, ServiceException {
1119
1120         String url = A1eConsts.API_ROOT + "/eijobs/" + jobId;
1121         String body = gson.toJson(infoJobInfo(infoTypeId, jobId));
1122         restClient().putForEntity(url, body).block();
1123
1124         return this.infoJobs.getJob(jobId);
1125     }
1126
1127     private HttpStatus putInfoType(String infoTypeId)
1128         throws JsonMappingException, JsonProcessingException, ServiceException {
1129         String url = ProducerConsts.API_ROOT + "/info-types/" + infoTypeId;
1130         String body = gson.toJson(producerEiTypeRegistrationInfo(infoTypeId));
1131
1132         ResponseEntity<String> resp = restClient().putForEntity(url, body).block();
1133         this.infoTypes.getType(infoTypeId);
1134         return resp.getStatusCode();
1135     }
1136
1137     private String deleteInfoTypeUrl(String typeId) {
1138         return ProducerConsts.API_ROOT + "/info-types/" + typeId;
1139     }
1140
1141     private void deleteInfoType(String typeId) {
1142         restClient().delete(deleteInfoTypeUrl(typeId)).block();
1143     }
1144
1145     private InfoType putEiProducerWithOneTypeRejecting(String producerId, String infoTypeId)
1146         throws JsonMappingException, JsonProcessingException, ServiceException {
1147         this.putInfoType(infoTypeId);
1148         String url = ProducerConsts.API_ROOT + "/info-producers/" + producerId;
1149         String body = gson.toJson(producerEiRegistratioInfoRejecting(infoTypeId));
1150         restClient().putForEntity(url, body).block();
1151         return this.infoTypes.getType(infoTypeId);
1152     }
1153
1154     private InfoType putInfoProducerWithOneType(String producerId, String infoTypeId)
1155         throws JsonMappingException, JsonProcessingException, ServiceException {
1156         this.putInfoType(infoTypeId);
1157
1158         String url = ProducerConsts.API_ROOT + "/info-producers/" + producerId;
1159         String body = gson.toJson(producerInfoRegistratioInfo(infoTypeId));
1160
1161         restClient().putForEntity(url, body).block();
1162
1163         return this.infoTypes.getType(infoTypeId);
1164     }
1165
1166     private String baseUrl() {
1167         return "https://localhost:" + this.port;
1168     }
1169
1170     private AsyncRestClient restClient(boolean useTrustValidation) {
1171         WebClientConfig config = this.applicationConfig.getWebClientConfig();
1172         HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
1173             .httpProxyHost("") //
1174             .httpProxyPort(0) //
1175             .build();
1176         config = ImmutableWebClientConfig.builder() //
1177             .keyStoreType(config.keyStoreType()) //
1178             .keyStorePassword(config.keyStorePassword()) //
1179             .keyStore(config.keyStore()) //
1180             .keyPassword(config.keyPassword()) //
1181             .isTrustStoreUsed(useTrustValidation) //
1182             .trustStore(config.trustStore()) //
1183             .trustStorePassword(config.trustStorePassword()) //
1184             .httpProxyConfig(httpProxyConfig).build();
1185
1186         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
1187         return restClientFactory.createRestClientNoHttpProxy(baseUrl());
1188     }
1189
1190     private AsyncRestClient restClient() {
1191         return restClient(false);
1192     }
1193
1194     private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
1195         testErrorCode(request, expStatus, responseContains, true);
1196     }
1197
1198     private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
1199         boolean expectApplicationProblemJsonMediaType) {
1200         StepVerifier.create(request) //
1201             .expectSubscription() //
1202             .expectErrorMatches(
1203                 t -> checkWebClientError(t, expStatus, responseContains, expectApplicationProblemJsonMediaType)) //
1204             .verify();
1205     }
1206
1207     private boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
1208         boolean expectApplicationProblemJsonMediaType) {
1209         assertTrue(throwable instanceof WebClientResponseException);
1210         WebClientResponseException responseException = (WebClientResponseException) throwable;
1211         assertThat(responseException.getStatusCode()).isEqualTo(expStatus);
1212         assertThat(responseException.getResponseBodyAsString()).contains(responseContains);
1213         if (expectApplicationProblemJsonMediaType) {
1214             assertThat(responseException.getHeaders().getContentType()).isEqualTo(MediaType.APPLICATION_PROBLEM_JSON);
1215         }
1216         return true;
1217     }
1218
1219 }