Code Review
/
com
/
gs-lite.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Refactor csv input processing. Add support for kafka interfaces. Fix bug in join...
[com/gs-lite.git]
/
src
/
ftacmp
/
stream_query.cc
diff --git
a/src/ftacmp/stream_query.cc
b/src/ftacmp/stream_query.cc
index
9d12cb0
..
faa1ea3
100644
(file)
--- a/
src/ftacmp/stream_query.cc
+++ b/
src/ftacmp/stream_query.cc
@@
-284,7
+284,8
@@
bool stream_query::generate_linkage(){
qhead = n;
}
}
qhead = n;
}
}
- if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size()){
+// does the query node read a source, or is it a source?
+ if(query_plan[n]->n_predecessors() < query_plan[n]->get_input_tbls().size() || query_plan[n]->get_input_tbls().size() == 0){
qtail.push_back(n);
}
}
qtail.push_back(n);
}
}
@@
-1956,6
+1957,10
@@
void get_prefilter_temporal_cids(std::vector<stream_query *> lfta_list, col_id_s
ci.load_from_colref(fj_node->temporal_var);
temp_cids.insert(ci);
}
ci.load_from_colref(fj_node->temporal_var);
temp_cids.insert(ci);
}
+ if(lfta_list[s]->query_plan[0]->node_type() == "watch_join"){
+ watch_join_qpn *wj_node = (watch_join_qpn *)lfta_list[s]->query_plan[0];
+ sl_list = wj_node->get_select_se_list();
+ }
for(sl=0;sl<sl_list.size();sl++){
data_type *sdt = sl_list[sl]->get_data_type();
for(sl=0;sl<sl_list.size();sl++){
data_type *sdt = sl_list[sl]->get_data_type();