import { MatPaginator } from "@angular/material/paginator";
import { Sort } from "@angular/material/sort";
import { MatTableDataSource } from "@angular/material/table";
-import { EMPTY, forkJoin, Subscription, timer } from "rxjs";
+import { EMPTY, forkJoin, of, pipe, Subscription, concat, Observable } from "rxjs";
import { BehaviorSubject } from "rxjs/BehaviorSubject";
-import { mergeMap, finalize, map, tap, switchMap } from "rxjs/operators";
-import { JobInfo } from "@interfaces/producer.types";
-import { ProducerService } from "@services/ei/producer.service";
+import { mergeMap, finalize, map, tap, concatMap, delay, skip, catchError } from "rxjs/operators";
+import { ConsumerService } from "@services/ei/consumer.service";
import { UiService } from "@services/ui/ui.service";
+import { OperationalState } from '@app/interfaces/consumer.types';
export interface Job {
jobId: string;
- jobData: any;
typeId: string;
targetUri: string;
owner: string;
- prodId: string;
+ prodIds: string[];
+ status: OperationalState;
}
@Component({
subscription: Subscription;
checked: boolean = false;
firstTime: boolean = true;
+ jobList: Job[] = [];
- constructor(private producerService: ProducerService, private ui: UiService) {
+ constructor(private consumerService: ConsumerService, private ui: UiService) {
this.jobForm = new FormGroup({
jobId: new FormControl(""),
typeId: new FormControl(""),
owner: new FormControl(""),
targetUri: new FormControl(""),
- prodId: new FormControl(""),
+ prodIds: new FormControl(""),
+ status: new FormControl("")
});
}
this.isDataIncluding(data.jobId, searchTerms.jobId) &&
this.isDataIncluding(data.owner, searchTerms.owner) &&
this.isDataIncluding(data.typeId, searchTerms.typeId) &&
- this.isDataIncluding(data.prodId, searchTerms.prodId)
+ this.isArrayIncluding(data.prodIds, searchTerms.prodIds) &&
+ this.isDataIncluding(data.status, searchTerms.status)
);
}) as (data: Job, filter: any) => boolean;
});
}
dataSubscription(): Subscription {
- let prodId = [];
- const jobs$ = this.producerService.getProducerIds().pipe(
- tap((data) => (prodId = data)),
- mergeMap((prodIds) =>
- forkJoin(prodIds.map((id) => this.producerService.getJobsForProducer(id)))
+ const jobsInfo$ = this.consumerService.getJobIds().pipe(
+ tap((_) => {
+ this.jobList = [] as Job[];
+ }),
+ mergeMap((jobIds) =>
+ forkJoin(jobIds.map((jobId) => {
+ return forkJoin([
+ of(jobId).pipe(
+ catchError(err => {
+ return of([-1]);
+ })
+ ),
+ this.consumerService.getJobInfo(jobId).pipe(
+ catchError(err => {
+ return of([-1]);
+ })),
+ this.consumerService.getConsumerStatus(jobId).pipe(
+ catchError(err => {
+ return of([-1]);
+ })),
+ ])
+ }))
),
- finalize(() => this.loadingSubject$.next(false))
+ finalize(() => {
+ this.loadingSubject$.next(false);
+ this.jobsSubject$.next(this.jobList);
+ })
);
+ const whenToRefresh$ = of('').pipe(
+ delay(10000),
+ tap((_) => this.refresh$.next('')),
+ skip(1),
+ );
+
+ const poll$ = concat(jobsInfo$, whenToRefresh$);
+
const refreshedJobs$ = this.refresh$.pipe(
- switchMap((_) =>
- timer(0, 10000).pipe(
- tap((_) => {
- this.loadingSubject$.next(true);
- }),
- switchMap((_) => jobs$),
- map((response) => this.extractJobs(prodId, response))
- )
- )
+ tap((_) => {
+ this.loadingSubject$.next(true);
+ }),
+ concatMap((_) => this.checked ? poll$ : jobsInfo$),
+ map((response) => this.extractJobs(response))
);
return this.polling$
.pipe(
- switchMap((value) => {
+ concatMap((value) => {
let pollCondition = value == 0 || this.checked;
return pollCondition ? refreshedJobs$ : EMPTY;
})
this.jobForm.get("typeId").setValue("");
this.jobForm.get("owner").setValue("");
this.jobForm.get("targetUri").setValue("");
- this.jobForm.get("prodId").setValue("");
+ this.jobForm.get("prodIds").setValue("");
+ this.jobForm.get("status").setValue("");
}
sortJobs(sort: Sort) {
return this.compare(a.owner, b.owner, isAsc);
case "targetUri":
return this.compare(a.targetUri, b.targetUri, isAsc);
- case "prodId":
- return this.compare(a.prodId, b.prodId, isAsc);
+ case "prodIds":
+ return this.compare(a.prodIds, b.prodIds, isAsc);
+ case "status":
+ return this.compare(a.status, b.status, isAsc);
default:
return 0;
}
stopPolling(checked) {
this.checked = checked;
this.polling$.next(this.jobs().length);
+ if (this.checked) {
+ this.refreshDataClick();
+ }
}
compare(a: any, b: any, isAsc: boolean) {
return data.toLowerCase().includes(transformedFilter);
}
+ isArrayIncluding(data: string[], filter: string): boolean {
+ if (!data)
+ return true;
+ for (let i = 0; i < data.length; i++) {
+ return this.isDataIncluding(data[i], filter);
+ }
+ }
+
getJobTypeId(job: Job): string {
if (job.typeId) {
return job.typeId;
return this.jobsSubject$.value;
}
- private extractJobs(prodId: number[], res: JobInfo[][]) {
+ private extractJobs(res: any) {
this.clearFilter();
- let jobList = [];
- prodId.forEach((element, index) => {
- let jobs = res[index];
- jobList = jobList.concat(jobs.map((job) => this.createJob(element, job)));
+ res.forEach(element => {
+ if (element[0] != -1) {
+ if (element[1] != -1 && element[2] != -1) {
+ let jobObj = <Job>{};
+ jobObj.jobId = element[0];
+ jobObj.owner = element[1].job_owner;
+ jobObj.targetUri = element[1].job_result_uri;
+ jobObj.typeId = element[1].info_type_id;
+ jobObj.prodIds = (element[2].producers) ? element[2].producers : ["No Producers"];
+ jobObj.status = element[2].info_job_status;
+ this.jobList = this.jobList.concat(jobObj);
+ } else {
+ let jobObj = <Job>{};
+ jobObj.jobId = element[0];
+ if (element[1] == -1) {
+ jobObj.owner = "--Missing information--";
+ jobObj.targetUri = "--Missing information--";
+ jobObj.typeId = "--Missing information--";
+ }
+ if (element[2] == -1) {
+ jobObj.prodIds = "--Missing information--" as unknown as [];
+ jobObj.status = "--Missing information--" as OperationalState;
+ }
+ this.jobList = this.jobList.concat(jobObj);
+ }
+ }
});
- this.jobsSubject$.next(jobList);
- if (this.firstTime && jobList.length > 0) {
- this.polling$.next(jobList.length);
+
+ if (this.firstTime && this.jobList.length > 0) {
+ this.polling$.next(this.jobList.length);
this.firstTime = false;
}
- return jobList;
+ return this.jobList;
}
- createJobList(prodId: any[], result: JobInfo[][]) {
- let jobList = [];
- prodId.forEach((element, index) => {
- let jobs = result[index];
- jobList = jobList.concat(jobs.map((job) => this.createJob(element, job)));
- });
- return jobList;
+ refreshDataClick() {
+ this.refresh$.next("");
}
- createJob(element: any, job: JobInfo): any {
- let infoJob = <Job>{};
- infoJob.jobId = job.info_job_identity;
- infoJob.typeId = job.info_type_identity;
- infoJob.owner = job.owner;
- infoJob.targetUri = job.target_uri;
- infoJob.prodId = element;
- return infoJob;
+ hasJobs(): boolean {
+ return this.jobs().length > 0;
}
- refreshDataClick() {
- this.refresh$.next("");
- }
}