X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=include%2Fhfta%2Fjoin_eq_hash_operator.h;h=880cc53c4bb3485627e42c2f492dd038569b871e;hb=52bf6cf53a585197f998187399ebfd88681d4490;hp=8324a3150def17c8f5f538875b1c14d12dc59351;hpb=f1754ecea2eab7bd0a302042ac82eb11667b166c;p=com%2Fgs-lite.git diff --git a/include/hfta/join_eq_hash_operator.h b/include/hfta/join_eq_hash_operator.h index 8324a31..880cc53 100644 --- a/include/hfta/join_eq_hash_operator.h +++ b/include/hfta/join_eq_hash_operator.h @@ -30,7 +30,7 @@ using namespace std; #define JOIN_OP_OUTER_JOIN 3 -#define MAX_TUPLE_SIZE 1024 +#define MAX_TUPLE_SIZE 10240 template class join_eq_hash_operator : public base_operator { @@ -44,11 +44,18 @@ int n_calls, n_iters, n_eqk; // list of tuples from one of the channel waiting to be compared // against tuple from the other channel + // Normally at least one should be empty after processing accept_tuple list input_queue[2]; // Admission control timestamp objects - timestamp *max_input_ts[2], *curr_ts; - bool hash_empty, curr_ts_valid; + timestamp *max_input_ts[2]; // largest timestamp received on this channel + // perhaps from a temporal tuple + timestamp *curr_ts; // current ts being processed. + bool curr_ts_valid; // both channels have a ts so curr_ts has been + // assigned a value + + bool hash_empty; // always true, seems an artifact + // max tuples received from input channels char max_input_tuple_data[2][MAX_TUPLE_SIZE]; @@ -91,6 +98,7 @@ int n_calls, n_iters, n_eqk; int compare_qts_to_hashts(int i){ timestamp tmp_ts; if(max_input_ts[i] == NULL) return(-1); +//printf("compare_qts_to_hashts channel %d: ",i); if(input_queue[i].empty()) return(func.compare_ts_with_ts(max_input_ts[i], curr_ts)); func.load_ts_from_tup(&tmp_ts,input_queue[i].front()); @@ -117,18 +125,21 @@ int n_calls, n_iters, n_eqk; rts = &tmp_rts; } +//printf("compare_qts : "); return(func.compare_ts_with_ts(lts,rts)); } int compare_tup_with_ts(host_tuple &tup, timestamp *ts){ timestamp tmp_ts; func.load_ts_from_tup(&tmp_ts, tup); +//printf("compare_tup_with_ts channel %d: ",tup.channel); return(func.compare_ts_with_ts(&tmp_ts, ts)); } void process_join(list& result){ int i; for(i=0;i<2;++i){ +//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size()); while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){ // apply tuples to join int other = 1-i; // the other channel @@ -176,6 +187,8 @@ int n_calls, n_iters, n_eqk; host_tuple empty_tuple; empty_tuple.tuple_size = 0; empty_tuple.data = NULL; +//printf("Processing outer join\n"); + hash_empty = true; typename hash_table::iterator jti; for(i=0;i<2;++i){ @@ -206,7 +219,7 @@ int n_calls, n_iters, n_eqk; } } } - join_tbl[i].clear(); join_tbl[i].rehash(); + join_tbl[i].clear(); join_tbl[i].resize(); } } @@ -249,6 +262,7 @@ n_calls=0; n_iters=0; n_eqk=0; // Ensure that the queue ts is initialized. if(max_input_ts[tup.channel] == NULL){ +//printf("Loading channel %d\n",tup.channel); max_input_ts[tup.channel] = new timestamp(); if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){ tup.free_tuple(); @@ -275,9 +289,10 @@ n_calls=0; n_iters=0; n_eqk=0; return(0); // can't load ts -- bail out. } +//printf("accept_tuple channel=%d: ",tup.channel); int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]); if (tup_order < 0){ -printf("%s: out of order ts.\n", op_name); + // printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel); tup.free_tuple(); // even for out of order temporal tuples we need to post new temporal tuple @@ -292,6 +307,7 @@ printf("%s: out of order ts.\n", op_name); // Update max if larger if(tup_order > 0){ +//printf("Loading channel %d\n",tup.channel); func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts); // save the content of the max tuple @@ -345,6 +361,7 @@ printf("%s: out of order ts.\n", op_name); } int flush(list& result) { +//printf("Calling flush\n"); process_outer_join(result); @@ -376,28 +393,39 @@ printf("%s: out of order ts.\n", op_name); int qcmp = compare_qts(); int minq = 0; if(qcmp>0) minq = 1; - host_tuple left_tuple, right_tuple; - left_tuple.tuple_size=0; left_tuple.data=NULL; - right_tuple.tuple_size=0; right_tuple.data=NULL; + timestamp left_ts, right_ts; + timestamp *left_ts_ptr = &left_ts; + timestamp *right_ts_ptr = &right_ts; - if (minq == 0) { - if(max_input_ts[minq]) { - if (input_queue[minq].empty()) - left_tuple = max_input_tuple[minq]; - else - left_tuple = input_queue[minq].front(); - } - } else { - if(max_input_ts[minq]) { - if (input_queue[minq].empty()) - right_tuple = max_input_tuple[minq]; - else - right_tuple = input_queue[minq].front(); - } + +/* + if (input_queue[0].empty()){ + printf("L=max_ts, "); + left_ts_ptr = max_input_ts[0]; + }else{ + printf("L=q, "); + func.load_ts_from_tup(left_ts_ptr, input_queue[0].front()); + } + + if (input_queue[1].empty()){ + printf("R=max_ts, "); + right_ts_ptr = max_input_ts[1]; + }else{ + printf("L=q, "); + func.load_ts_from_tup(right_ts_ptr, input_queue[1].front()); + } +*/ + if(curr_ts_valid){ + left_ts_ptr = curr_ts; + right_ts_ptr = curr_ts; + }else{ +//printf("curr_ts invalid\n"); + left_ts_ptr = NULL; + right_ts_ptr = NULL; } result.channel = output_channel; - return func.create_temp_status_tuple(left_tuple, right_tuple, result); + return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result); }