-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
-\r
- http://www.apache.org/licenses/LICENSE-2.0\r
-\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#ifndef JOIN_EQ_HASH_OPERATOR_H\r
-#define JOIN_EQ_HASH_OPERATOR_H\r
-\r
-#include "host_tuple.h"\r
-#include "base_operator.h"\r
-#include <list>\r
-#include"hash_table.h"\r
-using namespace std;\r
-\r
-#include <stdio.h>\r
-\r
-#define JOIN_OP_INNER_JOIN 0\r
-#define JOIN_OP_LEFT_OUTER_JOIN 1\r
-#define JOIN_OP_RIGHT_OUTER_JOIN 2\r
-#define JOIN_OP_OUTER_JOIN 3\r
-\r
-\r
-#define MAX_TUPLE_SIZE 1024\r
-\r
-template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>\r
-class join_eq_hash_operator : public base_operator {\r
-private :\r
- // type of join : inner vs. outer\r
- unsigned int join_type;\r
-int n_calls, n_iters, n_eqk;\r
-\r
- // for tracing\r
- int sch0, sch1;\r
-\r
- // list of tuples from one of the channel waiting to be compared\r
- // against tuple from the other channel\r
- list<host_tuple> input_queue[2];\r
-\r
-// Admission control timestamp objects\r
- timestamp *max_input_ts[2], *curr_ts;\r
- bool hash_empty, curr_ts_valid;\r
-\r
-// max tuples received from input channels\r
- char max_input_tuple_data[2][MAX_TUPLE_SIZE];\r
- host_tuple max_input_tuple[2];\r
-\r
-// The hash tables for the join algorithm\r
- hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];\r
-\r
-\r
- // comparator object used to provide methods for performing the joins.\r
- join_eq_hash_functor func;\r
-\r
- // soft limit on queue size - we consider operator blocked on its input\r
- // whenever we reach this soft limit (not used anymore)\r
- size_t soft_queue_size_limit;\r
-\r
-// For matching on join hash key\r
- equal_func equal_key;\r
-\r
- // memory footprint for the join queues in bytes\r
- unsigned int queue_mem;\r
-\r
-\r
- // appends tuple to the end of the one of the input queues\r
- // if tuple is stack resident, makes it heap resident\r
- int append_tuple(host_tuple& tup, int q) {\r
- int ret = input_queue[q].empty() ? 1 : 2;\r
- if (!tup.heap_resident) {\r
- char* data = (char*)malloc(tup.tuple_size);\r
- memcpy(data, tup.data, tup.tuple_size);\r
- tup.data = data;\r
- tup.heap_resident = true;\r
- }\r
- input_queue[q].push_back(tup);\r
- queue_mem += tup.tuple_size;\r
- return ret;\r
- }\r
-\r
-// -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller\r
- int compare_qts_to_hashts(int i){\r
- timestamp tmp_ts;\r
- if(max_input_ts[i] == NULL) return(-1);\r
- if(input_queue[i].empty())\r
- return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));\r
- func.load_ts_from_tup(&tmp_ts,input_queue[i].front());\r
- return(func.compare_ts_with_ts(&tmp_ts, curr_ts));\r
- }\r
-\r
-// -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal\r
- int compare_qts(){\r
- if(max_input_ts[0] == NULL) return(-1);\r
- if(max_input_ts[1] == NULL) return(1);\r
- timestamp tmp_lts, tmp_rts, *lts,*rts;\r
-\r
- if(input_queue[0].empty()){\r
- lts = max_input_ts[0];\r
- }else{\r
- func.load_ts_from_tup(&tmp_lts, input_queue[0].front());\r
- lts = &tmp_lts;\r
- }\r
-\r
- if(input_queue[1].empty()){\r
- rts = max_input_ts[1];\r
- }else{\r
- func.load_ts_from_tup(&tmp_rts, input_queue[1].front());\r
- rts = &tmp_rts;\r
- }\r
-\r
- return(func.compare_ts_with_ts(lts,rts));\r
- }\r
-\r
- int compare_tup_with_ts(host_tuple &tup, timestamp *ts){\r
- timestamp tmp_ts;\r
- func.load_ts_from_tup(&tmp_ts, tup);\r
- return(func.compare_ts_with_ts(&tmp_ts, ts));\r
- }\r
-\r
- void process_join(list<host_tuple>& result){\r
- int i;\r
- for(i=0;i<2;++i){\r
- while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){\r
-// apply tuples to join\r
- int other = 1-i; // the other channel\r
- bool failed;\r
-\r
-// Get tuple from list\r
- host_tuple qtup = input_queue[i].front();\r
- input_queue[i].pop_front();\r
- queue_mem -= qtup.tuple_size;\r
-\r
-// Put it into its join table\r
- hashkey *qtup_key = func.create_key(qtup,failed); // on heap\r
- if(failed){\r
- qtup.free_tuple();\r
- continue;\r
- }\r
- join_tbl[i].insert(qtup_key, qtup);\r
-\r
-// Join with matching tuples in other table.\r
-\r
- typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);\r
- while(jti != join_tbl[other].end()){\r
- if(equal_key((*jti).first, qtup_key)){\r
- host_tuple otup;\r
- if(i==0)\r
- otup = func.create_output_tuple( qtup, (*jti).second, failed );\r
- else\r
- otup = func.create_output_tuple( (*jti).second, qtup, failed );\r
- if(!failed){\r
- otup.channel = output_channel;\r
- result.push_back(otup);\r
- qtup_key->touch();\r
- (*jti).first->touch();\r
- }\r
- }\r
- jti = jti.next();\r
- }\r
- }\r
- }\r
- }\r
-\r
- void process_outer_join(list<host_tuple>& result){\r
- int i;\r
- bool failed;\r
- host_tuple empty_tuple;\r
- empty_tuple.tuple_size = 0; empty_tuple.data = NULL;\r
-\r
- hash_empty = true;\r
- typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;\r
- for(i=0;i<2;++i){\r
- if(!join_tbl[i].empty()){\r
- if(join_type & (i+1)){\r
- for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){\r
-// Outer join processing\r
- if( ! (*jti).first->is_touched() ){\r
- host_tuple otup;\r
- if(i==0)\r
- otup = func.create_output_tuple( (*jti).second, empty_tuple, failed );\r
- else\r
- otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );\r
- if(!failed){\r
- otup.channel = output_channel;\r
- result.push_back(otup);\r
- }\r
- }\r
-// end outer join processing\r
-\r
- delete((*jti).first);\r
- (*jti).second.free_tuple();\r
- }\r
- }else{\r
- for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){\r
- delete((*jti).first);\r
- (*jti).second.free_tuple();\r
- }\r
- }\r
- }\r
- join_tbl[i].clear(); join_tbl[i].rehash();\r
- }\r
-\r
- }\r
-\r
-public:\r
- join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {\r
- join_type = jtype;\r
- max_input_ts[0] = NULL; max_input_ts[1] = NULL;\r
- max_input_tuple[0].data = max_input_tuple_data[0];\r
- max_input_tuple[1].data = max_input_tuple_data[1];\r
-\r
- curr_ts = new timestamp();\r
- curr_ts_valid = false;\r
- hash_empty = true;\r
- soft_queue_size_limit = size_limit;\r
-\r
- sch0=schema_handle0;\r
- sch1=schema_handle1;\r
-n_calls=0; n_iters=0; n_eqk=0;\r
-\r
- queue_mem = 0;\r
- }\r
-\r
- int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
- bool do_join_update = false;\r
- int i;\r
- bool failed;\r
-\r
-// Dummy tuple for outer join processing.\r
- host_tuple empty_tuple;\r
- empty_tuple.tuple_size = 0; empty_tuple.data = NULL;\r
-\r
-\r
- if (tup.channel > 1) {\r
- gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);\r
- return 0;\r
- }\r
-\r
- bool is_temp_tuple = func.temp_status_received(tup);\r
-\r
-// Ensure that the queue ts is initialized.\r
- if(max_input_ts[tup.channel] == NULL){\r
- max_input_ts[tup.channel] = new timestamp();\r
- if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){\r
- tup.free_tuple();\r
- delete max_input_ts[tup.channel];\r
- max_input_ts[tup.channel] = NULL;\r
- return(0); // can't load ts -- bail out.\r
- }\r
-\r
- if( max_input_ts[1-tup.channel]){\r
- int qcmp = compare_qts();\r
- if(qcmp<=0){\r
- func.load_ts_from_ts(curr_ts, max_input_ts[0]);\r
- }else{\r
- func.load_ts_from_ts(curr_ts, max_input_ts[1]);\r
- }\r
- curr_ts_valid = true;\r
- }\r
- }\r
-\r
-// reject "out of order" tuple - silently.\r
- timestamp tup_ts;\r
- if(! func.load_ts_from_tup(&tup_ts,tup)){\r
- tup.free_tuple();\r
- return(0); // can't load ts -- bail out.\r
- }\r
-\r
- int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);\r
- if (tup_order < 0){\r
-printf("out of order ts.\n");\r
- tup.free_tuple();\r
-\r
- // even for out of order temporal tuples we need to post new temporal tuple\r
- if (is_temp_tuple) {\r
- host_tuple temp_tup;\r
- temp_tup.channel = output_channel;\r
- if (!get_temp_status(temp_tup))\r
- result.push_back(temp_tup);\r
- }\r
- return 0;\r
- }\r
-\r
-// Update max if larger\r
- if(tup_order > 0){\r
- func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);\r
-\r
- // save the content of the max tuple\r
- max_input_tuple[tup.channel].channel = tup.channel;\r
- max_input_tuple[tup.channel].tuple_size = tup.tuple_size;\r
- memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);\r
-\r
-// do_join_update = true;\r
- }\r
-\r
-// Add to input queue if it passes the prefilter.\r
- if(!is_temp_tuple && func.apply_prefilter(tup)){\r
- if(append_tuple(tup,tup.channel) == 1){\r
- do_join_update = true; // added tuple to empty queue\r
- }\r
- }else{\r
- tup.free_tuple();\r
- }\r
-\r
-// If status changed, apply tuples to join.\r
-// (updated max time, added tuple to empty queue)\r
-\r
-// clear queues, advance curr_ts\r
- if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){\r
- process_outer_join(result);\r
-\r
-\r
- int minq = 0;\r
- if(compare_qts() > 0)\r
- minq = 1;\r
- if(input_queue[minq].empty())\r
- func.load_ts_from_ts(curr_ts,max_input_ts[minq]);\r
- else\r
- func.load_ts_from_tup(curr_ts,input_queue[minq].front());\r
- }\r
-\r
-// Process any tuples to be joined.\r
- process_join(result);\r
-\r
-\r
- // post new temporal tuple\r
-\r
- if(is_temp_tuple) {\r
- host_tuple temp_tup;\r
- temp_tup.channel = output_channel;\r
- if (!get_temp_status(temp_tup))\r
- result.push_back(temp_tup);\r
- }\r
-\r
- return 0;\r
- }\r
-\r
- int flush(list<host_tuple>& result) {\r
-\r
- process_outer_join(result);\r
-\r
- int minq = 0;\r
- if(compare_qts() > 0)\r
- minq = 1;\r
-\r
- if(input_queue[minq].empty())\r
- func.load_ts_from_ts(curr_ts,max_input_ts[minq]);\r
- else\r
- func.load_ts_from_tup(curr_ts,input_queue[minq].front());\r
-\r
- process_join(result);\r
-\r
- return 0;\r
- }\r
-\r
- int set_param_block(int sz, void * value) {\r
- func.set_param_block(sz, value);\r
- return 0;\r
- }\r
-\r
-\r
- int get_temp_status(host_tuple& result) {\r
-// temp tuple timestamp should be minimum between\r
-// minimums of all input queues\r
-\r
- // find the inputstream in minimum lowebound of the timestamp\r
- int qcmp = compare_qts();\r
- int minq = 0; if(qcmp>0) minq = 1;\r
-\r
- host_tuple empty_tuple;\r
- empty_tuple.tuple_size = 0; empty_tuple.data = NULL;\r
- host_tuple& left_tuple = empty_tuple;\r
- host_tuple& right_tuple = empty_tuple;\r
-\r
- if (minq == 0) {\r
- if(max_input_ts[minq]) {\r
- if (input_queue[minq].empty())\r
- left_tuple = max_input_tuple[minq];\r
- else\r
- left_tuple = input_queue[minq].front();\r
- }\r
- } else {\r
- if(max_input_ts[minq]) {\r
- if (input_queue[minq].empty())\r
- right_tuple = max_input_tuple[minq];\r
- else\r
- right_tuple = input_queue[minq].front();\r
- }\r
- }\r
-\r
- result.channel = output_channel;\r
- return func.create_temp_status_tuple(left_tuple, right_tuple, result);\r
- }\r
-\r
-\r
- int get_blocked_status () {\r
- if(input_queue[0].size() > soft_queue_size_limit) return(0);\r
- if(input_queue[1].size() > soft_queue_size_limit) return(1);\r
- return -1;\r
- }\r
-\r
- unsigned int get_mem_footprint() {\r
- return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;\r
- }\r
-};\r
-\r
-#endif // JOIN_EQ_HASH_OPERATOR_H\r
-\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#ifndef JOIN_EQ_HASH_OPERATOR_H
+#define JOIN_EQ_HASH_OPERATOR_H
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+#include"hash_table.h"
+using namespace std;
+
+#include <stdio.h>
+
+#define JOIN_OP_INNER_JOIN 0
+#define JOIN_OP_LEFT_OUTER_JOIN 1
+#define JOIN_OP_RIGHT_OUTER_JOIN 2
+#define JOIN_OP_OUTER_JOIN 3
+
+
+#define MAX_TUPLE_SIZE 10240
+
+template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
+class join_eq_hash_operator : public base_operator {
+private :
+ // type of join : inner vs. outer
+ unsigned int join_type;
+int n_calls, n_iters, n_eqk;
+
+ // for tracing
+ int sch0, sch1;
+
+ // list of tuples from one of the channel waiting to be compared
+ // against tuple from the other channel
+ // Normally at least one should be empty after processing accept_tuple
+ list<host_tuple> input_queue[2];
+
+// Admission control timestamp objects
+ timestamp *max_input_ts[2]; // largest timestamp received on this channel
+ // perhaps from a temporal tuple
+ timestamp *curr_ts; // current ts being processed.
+ bool curr_ts_valid; // both channels have a ts so curr_ts has been
+ // assigned a value
+
+ bool hash_empty; // always true, seems an artifact
+
+
+// max tuples received from input channels
+ char max_input_tuple_data[2][MAX_TUPLE_SIZE];
+ host_tuple max_input_tuple[2];
+
+// The hash tables for the join algorithm
+ hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];
+
+
+ // comparator object used to provide methods for performing the joins.
+ join_eq_hash_functor func;
+
+ // soft limit on queue size - we consider operator blocked on its input
+ // whenever we reach this soft limit (not used anymore)
+ size_t soft_queue_size_limit;
+
+// For matching on join hash key
+ equal_func equal_key;
+
+ // memory footprint for the join queues in bytes
+ unsigned int queue_mem;
+
+
+ // appends tuple to the end of the one of the input queues
+ // if tuple is stack resident, makes it heap resident
+ int append_tuple(host_tuple& tup, int q) {
+ int ret = input_queue[q].empty() ? 1 : 2;
+ if (!tup.heap_resident) {
+ char* data = (char*)malloc(tup.tuple_size);
+ memcpy(data, tup.data, tup.tuple_size);
+ tup.data = data;
+ tup.heap_resident = true;
+ }
+ input_queue[q].push_back(tup);
+ queue_mem += tup.tuple_size;
+ return ret;
+ }
+
+// -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller
+ int compare_qts_to_hashts(int i){
+ timestamp tmp_ts;
+ if(max_input_ts[i] == NULL) return(-1);
+//printf("compare_qts_to_hashts channel %d: ",i);
+ if(input_queue[i].empty())
+ return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
+ func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
+ return(func.compare_ts_with_ts(&tmp_ts, curr_ts));
+ }
+
+// -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal
+ int compare_qts(){
+ if(max_input_ts[0] == NULL) return(-1);
+ if(max_input_ts[1] == NULL) return(1);
+ timestamp tmp_lts, tmp_rts, *lts,*rts;
+
+ if(input_queue[0].empty()){
+ lts = max_input_ts[0];
+ }else{
+ func.load_ts_from_tup(&tmp_lts, input_queue[0].front());
+ lts = &tmp_lts;
+ }
+
+ if(input_queue[1].empty()){
+ rts = max_input_ts[1];
+ }else{
+ func.load_ts_from_tup(&tmp_rts, input_queue[1].front());
+ rts = &tmp_rts;
+ }
+
+//printf("compare_qts : ");
+ return(func.compare_ts_with_ts(lts,rts));
+ }
+
+ int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
+ timestamp tmp_ts;
+ func.load_ts_from_tup(&tmp_ts, tup);
+//printf("compare_tup_with_ts channel %d: ",tup.channel);
+ return(func.compare_ts_with_ts(&tmp_ts, ts));
+ }
+
+ void process_join(list<host_tuple>& result){
+ int i;
+ for(i=0;i<2;++i){
+//printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
+ while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
+// apply tuples to join
+ int other = 1-i; // the other channel
+ bool failed;
+
+// Get tuple from list
+ host_tuple qtup = input_queue[i].front();
+ input_queue[i].pop_front();
+ queue_mem -= qtup.tuple_size;
+
+// Put it into its join table
+ hashkey *qtup_key = func.create_key(qtup,failed); // on heap
+ if(failed){
+ qtup.free_tuple();
+ continue;
+ }
+ join_tbl[i].insert(qtup_key, qtup);
+
+// Join with matching tuples in other table.
+
+ typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);
+ while(jti != join_tbl[other].end()){
+ if(equal_key((*jti).first, qtup_key)){
+ host_tuple otup;
+ if(i==0)
+ otup = func.create_output_tuple( qtup, (*jti).second, failed );
+ else
+ otup = func.create_output_tuple( (*jti).second, qtup, failed );
+ if(!failed){
+ otup.channel = output_channel;
+ result.push_back(otup);
+ qtup_key->touch();
+ (*jti).first->touch();
+ }
+ }
+ jti = jti.next();
+ }
+ }
+ }
+ }
+
+ void process_outer_join(list<host_tuple>& result){
+ int i;
+ bool failed;
+ host_tuple empty_tuple;
+ empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
+
+//printf("Processing outer join\n");
+
+ hash_empty = true;
+ typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
+ for(i=0;i<2;++i){
+ if(!join_tbl[i].empty()){
+ if(join_type & (i+1)){
+ for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
+// Outer join processing
+ if( ! (*jti).first->is_touched() ){
+ host_tuple otup;
+ if(i==0)
+ otup = func.create_output_tuple( (*jti).second, empty_tuple, failed );
+ else
+ otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );
+ if(!failed){
+ otup.channel = output_channel;
+ result.push_back(otup);
+ }
+ }
+// end outer join processing
+
+ delete((*jti).first);
+ (*jti).second.free_tuple();
+ }
+ }else{
+ for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
+ delete((*jti).first);
+ (*jti).second.free_tuple();
+ }
+ }
+ }
+ join_tbl[i].clear(); join_tbl[i].resize();
+ }
+
+ }
+
+public:
+ join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {
+ join_type = jtype;
+ max_input_ts[0] = NULL; max_input_ts[1] = NULL;
+ max_input_tuple[0].data = max_input_tuple_data[0];
+ max_input_tuple[1].data = max_input_tuple_data[1];
+
+ curr_ts = new timestamp();
+ curr_ts_valid = false;
+ hash_empty = true;
+ soft_queue_size_limit = size_limit;
+
+ sch0=schema_handle0;
+ sch1=schema_handle1;
+n_calls=0; n_iters=0; n_eqk=0;
+
+ queue_mem = 0;
+ }
+
+ int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+ bool do_join_update = false;
+ int i;
+ bool failed;
+
+// Dummy tuple for outer join processing.
+ host_tuple empty_tuple;
+ empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
+
+
+ if (tup.channel > 1) {
+ gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);
+ return 0;
+ }
+
+ bool is_temp_tuple = func.temp_status_received(tup);
+
+// Ensure that the queue ts is initialized.
+ if(max_input_ts[tup.channel] == NULL){
+//printf("Loading channel %d\n",tup.channel);
+ max_input_ts[tup.channel] = new timestamp();
+ if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
+ tup.free_tuple();
+ delete max_input_ts[tup.channel];
+ max_input_ts[tup.channel] = NULL;
+ return(0); // can't load ts -- bail out.
+ }
+
+ if( max_input_ts[1-tup.channel]){
+ int qcmp = compare_qts();
+ if(qcmp<=0){
+ func.load_ts_from_ts(curr_ts, max_input_ts[0]);
+ }else{
+ func.load_ts_from_ts(curr_ts, max_input_ts[1]);
+ }
+ curr_ts_valid = true;
+ }
+ }
+
+// reject "out of order" tuple - silently.
+ timestamp tup_ts;
+ if(! func.load_ts_from_tup(&tup_ts,tup)){
+ tup.free_tuple();
+ return(0); // can't load ts -- bail out.
+ }
+
+//printf("accept_tuple channel=%d: ",tup.channel);
+ int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
+ if (tup_order < 0){
+ printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
+ tup.free_tuple();
+
+ // even for out of order temporal tuples we need to post new temporal tuple
+ if (is_temp_tuple) {
+ host_tuple temp_tup;
+ temp_tup.channel = output_channel;
+ if (!get_temp_status(temp_tup))
+ result.push_back(temp_tup);
+ }
+ return 0;
+ }
+
+// Update max if larger
+ if(tup_order > 0){
+//printf("Loading channel %d\n",tup.channel);
+ func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
+
+ // save the content of the max tuple
+ max_input_tuple[tup.channel].channel = tup.channel;
+ max_input_tuple[tup.channel].tuple_size = tup.tuple_size;
+ memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);
+
+// do_join_update = true;
+ }
+
+// Add to input queue if it passes the prefilter.
+ if(!is_temp_tuple && func.apply_prefilter(tup)){
+ if(append_tuple(tup,tup.channel) == 1){
+ do_join_update = true; // added tuple to empty queue
+ }
+ }else{
+ tup.free_tuple();
+ }
+
+// If status changed, apply tuples to join.
+// (updated max time, added tuple to empty queue)
+
+// clear queues, advance curr_ts
+ if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){
+ process_outer_join(result);
+
+
+ int minq = 0;
+ if(compare_qts() > 0)
+ minq = 1;
+ if(input_queue[minq].empty())
+ func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
+ else
+ func.load_ts_from_tup(curr_ts,input_queue[minq].front());
+ }
+
+// Process any tuples to be joined.
+ process_join(result);
+
+
+ // post new temporal tuple
+
+ if(is_temp_tuple) {
+ host_tuple temp_tup;
+ temp_tup.channel = output_channel;
+ if (!get_temp_status(temp_tup))
+ result.push_back(temp_tup);
+ }
+
+ return 0;
+ }
+
+ int flush(list<host_tuple>& result) {
+//printf("Calling flush\n");
+
+ process_outer_join(result);
+
+ int minq = 0;
+ if(compare_qts() > 0)
+ minq = 1;
+
+ if(input_queue[minq].empty())
+ func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
+ else
+ func.load_ts_from_tup(curr_ts,input_queue[minq].front());
+
+ process_join(result);
+
+ return 0;
+ }
+
+ int set_param_block(int sz, void * value) {
+ func.set_param_block(sz, value);
+ return 0;
+ }
+
+
+ int get_temp_status(host_tuple& result) {
+// temp tuple timestamp should be minimum between
+// minimums of all input queues
+
+ // find the inputstream in minimum lowebound of the timestamp
+ int qcmp = compare_qts();
+ int minq = 0; if(qcmp>0) minq = 1;
+
+ timestamp left_ts, right_ts;
+ timestamp *left_ts_ptr = &left_ts;
+ timestamp *right_ts_ptr = &right_ts;
+
+
+/*
+ if (input_queue[0].empty()){
+ printf("L=max_ts, ");
+ left_ts_ptr = max_input_ts[0];
+ }else{
+ printf("L=q, ");
+ func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
+ }
+
+ if (input_queue[1].empty()){
+ printf("R=max_ts, ");
+ right_ts_ptr = max_input_ts[1];
+ }else{
+ printf("L=q, ");
+ func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
+ }
+*/
+ if(curr_ts_valid){
+ left_ts_ptr = curr_ts;
+ right_ts_ptr = curr_ts;
+ }else{
+//printf("curr_ts invalid\n");
+ left_ts_ptr = NULL;
+ right_ts_ptr = NULL;
+ }
+
+ result.channel = output_channel;
+ return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
+ }
+
+
+ int get_blocked_status () {
+ if(input_queue[0].size() > soft_queue_size_limit) return(0);
+ if(input_queue[1].size() > soft_queue_size_limit) return(1);
+ return -1;
+ }
+
+ unsigned int get_mem_footprint() {
+ return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;
+ }
+};
+
+#endif // JOIN_EQ_HASH_OPERATOR_H
+