Fix out-of-order errors in join operator
[com/gs-lite.git] / include / hfta / join_eq_hash_operator.h
index 8324a31..0b9a9f1 100644 (file)
@@ -30,7 +30,7 @@ using namespace std;
 #define JOIN_OP_OUTER_JOIN 3
 
 
-#define MAX_TUPLE_SIZE 1024
+#define MAX_TUPLE_SIZE 10240
 
 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
 class join_eq_hash_operator : public base_operator {
@@ -44,11 +44,18 @@ int n_calls, n_iters, n_eqk;
 
        // list of tuples from one of the channel waiting to be compared
        // against tuple from the other channel
+       // Normally at least one should be empty after processing accept_tuple
        list<host_tuple> input_queue[2];
 
 //             Admission control timestamp objects
-       timestamp *max_input_ts[2], *curr_ts;
-       bool hash_empty, curr_ts_valid;
+       timestamp *max_input_ts[2];     // largest timestamp received on this channel
+                                                               // perhaps from a temporal tuple
+       timestamp *curr_ts;                     // current ts being processed.
+       bool curr_ts_valid;                     // both channels have a ts so curr_ts has been
+                                                               // assigned a value
+
+       bool hash_empty;                        // always true, seems an artifact
+
 
 //     max tuples received from input channels
        char max_input_tuple_data[2][MAX_TUPLE_SIZE];
@@ -91,6 +98,7 @@ int n_calls, n_iters, n_eqk;
        int compare_qts_to_hashts(int i){
                timestamp tmp_ts;
                if(max_input_ts[i] == NULL) return(-1);
+//printf("compare_qts_to_hashts channel %d: ",i);
                if(input_queue[i].empty())
                        return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
                func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
@@ -117,18 +125,21 @@ int n_calls, n_iters, n_eqk;
                        rts = &tmp_rts;
                }
 
+//printf("compare_qts : ");
                return(func.compare_ts_with_ts(lts,rts));
        }
 
        int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
                timestamp tmp_ts;
                func.load_ts_from_tup(&tmp_ts, tup);
+//printf("compare_tup_with_ts channel %d: ",tup.channel);
                return(func.compare_ts_with_ts(&tmp_ts, ts));
        }
 
        void process_join(list<host_tuple>& result){
          int i;
          for(i=0;i<2;++i){
+//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
                while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
 //                             apply tuples to join
                        int other = 1-i;        // the other channel
@@ -176,6 +187,8 @@ int n_calls, n_iters, n_eqk;
     host_tuple empty_tuple;
        empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
 
+//printf("Processing outer join\n");
+
        hash_empty = true;
        typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
        for(i=0;i<2;++i){
@@ -249,6 +262,7 @@ n_calls=0; n_iters=0; n_eqk=0;
 
 //                     Ensure that the queue ts is initialized.
                if(max_input_ts[tup.channel] == NULL){
+//printf("Loading channel %d\n",tup.channel);
                        max_input_ts[tup.channel] = new timestamp();
                        if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
                                tup.free_tuple();
@@ -275,9 +289,10 @@ n_calls=0; n_iters=0; n_eqk=0;
                        return(0);      // can't load ts -- bail out.
                }
 
+//printf("accept_tuple channel=%d: ",tup.channel);
            int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
                if (tup_order < 0){
-printf("%s: out of order ts.\n", op_name);
+                       printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
                        tup.free_tuple();
 
                        // even for out of order temporal tuples we need to post new temporal tuple
@@ -292,6 +307,7 @@ printf("%s: out of order ts.\n", op_name);
 
 //     Update max if larger
                if(tup_order > 0){
+//printf("Loading channel %d\n",tup.channel);
                        func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
 
                        // save the content of the max tuple
@@ -345,6 +361,7 @@ printf("%s: out of order ts.\n", op_name);
        }
 
        int flush(list<host_tuple>& result) {
+//printf("Calling flush\n");
 
                process_outer_join(result);
 
@@ -376,28 +393,39 @@ printf("%s: out of order ts.\n", op_name);
                int qcmp = compare_qts();
                int minq = 0; if(qcmp>0) minq = 1;
 
-               host_tuple left_tuple, right_tuple;
-               left_tuple.tuple_size=0; left_tuple.data=NULL;
-               right_tuple.tuple_size=0; right_tuple.data=NULL;
+               timestamp left_ts, right_ts;
+               timestamp *left_ts_ptr = &left_ts;
+               timestamp *right_ts_ptr = &right_ts;
 
-               if (minq == 0) {
-                       if(max_input_ts[minq]) {
-                               if (input_queue[minq].empty())
-                                       left_tuple = max_input_tuple[minq];
-                               else
-                                       left_tuple = input_queue[minq].front();
-                       }
-               } else {
-                       if(max_input_ts[minq]) {
-                               if (input_queue[minq].empty())
-                                       right_tuple = max_input_tuple[minq];
-                               else
-                                       right_tuple = input_queue[minq].front();
-                       }
+               
+/*
+               if (input_queue[0].empty()){
+                       printf("L=max_ts, ");
+                       left_ts_ptr = max_input_ts[0];
+               }else{
+                       printf("L=q, ");
+                       func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
+               }
+               
+               if (input_queue[1].empty()){
+                       printf("R=max_ts, ");
+                       right_ts_ptr = max_input_ts[1];
+               }else{
+                       printf("L=q, ");
+                       func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
+               }
+*/
+               if(curr_ts_valid){
+                       left_ts_ptr = curr_ts;
+                       right_ts_ptr = curr_ts;
+               }else{
+//printf("curr_ts invalid\n");
+                       left_ts_ptr = NULL;
+                       right_ts_ptr = NULL;
                }
 
                result.channel = output_channel;
-               return func.create_temp_status_tuple(left_tuple, right_tuple, result);
+               return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
        }