#define JOIN_OP_OUTER_JOIN 3
-#define MAX_TUPLE_SIZE 1024
+#define MAX_TUPLE_SIZE 10240
template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
class join_eq_hash_operator : public base_operator {
// list of tuples from one of the channel waiting to be compared
// against tuple from the other channel
+ // Normally at least one should be empty after processing accept_tuple
list<host_tuple> input_queue[2];
// Admission control timestamp objects
- timestamp *max_input_ts[2], *curr_ts;
- bool hash_empty, curr_ts_valid;
+ timestamp *max_input_ts[2]; // largest timestamp received on this channel
+ // perhaps from a temporal tuple
+ timestamp *curr_ts; // current ts being processed.
+ bool curr_ts_valid; // both channels have a ts so curr_ts has been
+ // assigned a value
+
+ bool hash_empty; // always true, seems an artifact
+
// max tuples received from input channels
char max_input_tuple_data[2][MAX_TUPLE_SIZE];
int compare_qts_to_hashts(int i){
timestamp tmp_ts;
if(max_input_ts[i] == NULL) return(-1);
+//printf("compare_qts_to_hashts channel %d: ",i);
if(input_queue[i].empty())
return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
rts = &tmp_rts;
}
+//printf("compare_qts : ");
return(func.compare_ts_with_ts(lts,rts));
}
int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
timestamp tmp_ts;
func.load_ts_from_tup(&tmp_ts, tup);
+//printf("compare_tup_with_ts channel %d: ",tup.channel);
return(func.compare_ts_with_ts(&tmp_ts, ts));
}
void process_join(list<host_tuple>& result){
int i;
for(i=0;i<2;++i){
+//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
// apply tuples to join
int other = 1-i; // the other channel
host_tuple empty_tuple;
empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
+//printf("Processing outer join\n");
+
hash_empty = true;
typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
for(i=0;i<2;++i){
}
}
}
- join_tbl[i].clear(); join_tbl[i].rehash();
+ join_tbl[i].clear(); join_tbl[i].resize();
}
}
// Ensure that the queue ts is initialized.
if(max_input_ts[tup.channel] == NULL){
+//printf("Loading channel %d\n",tup.channel);
max_input_ts[tup.channel] = new timestamp();
if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
tup.free_tuple();
return(0); // can't load ts -- bail out.
}
+//printf("accept_tuple channel=%d: ",tup.channel);
int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
if (tup_order < 0){
-printf("%s: out of order ts.\n", op_name);
+ printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
tup.free_tuple();
// even for out of order temporal tuples we need to post new temporal tuple
// Update max if larger
if(tup_order > 0){
+//printf("Loading channel %d\n",tup.channel);
func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
// save the content of the max tuple
}
int flush(list<host_tuple>& result) {
+//printf("Calling flush\n");
process_outer_join(result);
int qcmp = compare_qts();
int minq = 0; if(qcmp>0) minq = 1;
- host_tuple empty_tuple;
- empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
- host_tuple& left_tuple = empty_tuple;
- host_tuple& right_tuple = empty_tuple;
-
- if (minq == 0) {
- if(max_input_ts[minq]) {
- if (input_queue[minq].empty())
- left_tuple = max_input_tuple[minq];
- else
- left_tuple = input_queue[minq].front();
- }
- } else {
- if(max_input_ts[minq]) {
- if (input_queue[minq].empty())
- right_tuple = max_input_tuple[minq];
- else
- right_tuple = input_queue[minq].front();
- }
+ timestamp left_ts, right_ts;
+ timestamp *left_ts_ptr = &left_ts;
+ timestamp *right_ts_ptr = &right_ts;
+
+
+/*
+ if (input_queue[0].empty()){
+ printf("L=max_ts, ");
+ left_ts_ptr = max_input_ts[0];
+ }else{
+ printf("L=q, ");
+ func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
+ }
+
+ if (input_queue[1].empty()){
+ printf("R=max_ts, ");
+ right_ts_ptr = max_input_ts[1];
+ }else{
+ printf("L=q, ");
+ func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
+ }
+*/
+ if(curr_ts_valid){
+ left_ts_ptr = curr_ts;
+ right_ts_ptr = curr_ts;
+ }else{
+//printf("curr_ts invalid\n");
+ left_ts_ptr = NULL;
+ right_ts_ptr = NULL;
}
result.channel = output_channel;
- return func.create_temp_status_tuple(left_tuple, right_tuple, result);
+ return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
}