1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef JOIN_EQ_HASH_OPERATOR_H
17 #define JOIN_EQ_HASH_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include"hash_table.h"
27 #define JOIN_OP_INNER_JOIN 0
28 #define JOIN_OP_LEFT_OUTER_JOIN 1
29 #define JOIN_OP_RIGHT_OUTER_JOIN 2
30 #define JOIN_OP_OUTER_JOIN 3
33 #define MAX_TUPLE_SIZE 1024
35 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
36 class join_eq_hash_operator : public base_operator {
38 // type of join : inner vs. outer
39 unsigned int join_type;
40 int n_calls, n_iters, n_eqk;
45 // list of tuples from one of the channel waiting to be compared
46 // against tuple from the other channel
47 list<host_tuple> input_queue[2];
49 // Admission control timestamp objects
50 timestamp *max_input_ts[2], *curr_ts;
51 bool hash_empty, curr_ts_valid;
53 // max tuples received from input channels
54 char max_input_tuple_data[2][MAX_TUPLE_SIZE];
55 host_tuple max_input_tuple[2];
57 // The hash tables for the join algorithm
58 hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];
61 // comparator object used to provide methods for performing the joins.
62 join_eq_hash_functor func;
64 // soft limit on queue size - we consider operator blocked on its input
65 // whenever we reach this soft limit (not used anymore)
66 size_t soft_queue_size_limit;
68 // For matching on join hash key
71 // memory footprint for the join queues in bytes
72 unsigned int queue_mem;
75 // appends tuple to the end of the one of the input queues
76 // if tuple is stack resident, makes it heap resident
77 int append_tuple(host_tuple& tup, int q) {
78 int ret = input_queue[q].empty() ? 1 : 2;
79 if (!tup.heap_resident) {
80 char* data = (char*)malloc(tup.tuple_size);
81 memcpy(data, tup.data, tup.tuple_size);
83 tup.heap_resident = true;
85 input_queue[q].push_back(tup);
86 queue_mem += tup.tuple_size;
90 // -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller
91 int compare_qts_to_hashts(int i){
93 if(max_input_ts[i] == NULL) return(-1);
94 if(input_queue[i].empty())
95 return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
96 func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
97 return(func.compare_ts_with_ts(&tmp_ts, curr_ts));
100 // -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal
102 if(max_input_ts[0] == NULL) return(-1);
103 if(max_input_ts[1] == NULL) return(1);
104 timestamp tmp_lts, tmp_rts, *lts,*rts;
106 if(input_queue[0].empty()){
107 lts = max_input_ts[0];
109 func.load_ts_from_tup(&tmp_lts, input_queue[0].front());
113 if(input_queue[1].empty()){
114 rts = max_input_ts[1];
116 func.load_ts_from_tup(&tmp_rts, input_queue[1].front());
120 return(func.compare_ts_with_ts(lts,rts));
123 int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
125 func.load_ts_from_tup(&tmp_ts, tup);
126 return(func.compare_ts_with_ts(&tmp_ts, ts));
129 void process_join(list<host_tuple>& result){
132 while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
133 // apply tuples to join
134 int other = 1-i; // the other channel
137 // Get tuple from list
138 host_tuple qtup = input_queue[i].front();
139 input_queue[i].pop_front();
140 queue_mem -= qtup.tuple_size;
142 // Put it into its join table
143 hashkey *qtup_key = func.create_key(qtup,failed); // on heap
148 join_tbl[i].insert(qtup_key, qtup);
150 // Join with matching tuples in other table.
152 typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);
153 while(jti != join_tbl[other].end()){
154 if(equal_key((*jti).first, qtup_key)){
157 otup = func.create_output_tuple( qtup, (*jti).second, failed );
159 otup = func.create_output_tuple( (*jti).second, qtup, failed );
161 otup.channel = output_channel;
162 result.push_back(otup);
164 (*jti).first->touch();
173 void process_outer_join(list<host_tuple>& result){
176 host_tuple empty_tuple;
177 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
180 typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
182 if(!join_tbl[i].empty()){
183 if(join_type & (i+1)){
184 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
185 // Outer join processing
186 if( ! (*jti).first->is_touched() ){
189 otup = func.create_output_tuple( (*jti).second, empty_tuple, failed );
191 otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );
193 otup.channel = output_channel;
194 result.push_back(otup);
197 // end outer join processing
199 delete((*jti).first);
200 (*jti).second.free_tuple();
203 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
204 delete((*jti).first);
205 (*jti).second.free_tuple();
209 join_tbl[i].clear(); join_tbl[i].rehash();
215 join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {
217 max_input_ts[0] = NULL; max_input_ts[1] = NULL;
218 max_input_tuple[0].data = max_input_tuple_data[0];
219 max_input_tuple[1].data = max_input_tuple_data[1];
221 curr_ts = new timestamp();
222 curr_ts_valid = false;
224 soft_queue_size_limit = size_limit;
228 n_calls=0; n_iters=0; n_eqk=0;
233 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
234 bool do_join_update = false;
238 // Dummy tuple for outer join processing.
239 host_tuple empty_tuple;
240 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
243 if (tup.channel > 1) {
244 gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);
248 bool is_temp_tuple = func.temp_status_received(tup);
250 // Ensure that the queue ts is initialized.
251 if(max_input_ts[tup.channel] == NULL){
252 max_input_ts[tup.channel] = new timestamp();
253 if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
255 delete max_input_ts[tup.channel];
256 max_input_ts[tup.channel] = NULL;
257 return(0); // can't load ts -- bail out.
260 if( max_input_ts[1-tup.channel]){
261 int qcmp = compare_qts();
263 func.load_ts_from_ts(curr_ts, max_input_ts[0]);
265 func.load_ts_from_ts(curr_ts, max_input_ts[1]);
267 curr_ts_valid = true;
271 // reject "out of order" tuple - silently.
273 if(! func.load_ts_from_tup(&tup_ts,tup)){
275 return(0); // can't load ts -- bail out.
278 int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
280 printf("out of order ts.\n");
283 // even for out of order temporal tuples we need to post new temporal tuple
286 temp_tup.channel = output_channel;
287 if (!get_temp_status(temp_tup))
288 result.push_back(temp_tup);
293 // Update max if larger
295 func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
297 // save the content of the max tuple
298 max_input_tuple[tup.channel].channel = tup.channel;
299 max_input_tuple[tup.channel].tuple_size = tup.tuple_size;
300 memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);
302 // do_join_update = true;
305 // Add to input queue if it passes the prefilter.
306 if(!is_temp_tuple && func.apply_prefilter(tup)){
307 if(append_tuple(tup,tup.channel) == 1){
308 do_join_update = true; // added tuple to empty queue
314 // If status changed, apply tuples to join.
315 // (updated max time, added tuple to empty queue)
317 // clear queues, advance curr_ts
318 if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){
319 process_outer_join(result);
323 if(compare_qts() > 0)
325 if(input_queue[minq].empty())
326 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
328 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
331 // Process any tuples to be joined.
332 process_join(result);
335 // post new temporal tuple
339 temp_tup.channel = output_channel;
340 if (!get_temp_status(temp_tup))
341 result.push_back(temp_tup);
347 int flush(list<host_tuple>& result) {
349 process_outer_join(result);
352 if(compare_qts() > 0)
355 if(input_queue[minq].empty())
356 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
358 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
360 process_join(result);
365 int set_param_block(int sz, void * value) {
366 func.set_param_block(sz, value);
371 int get_temp_status(host_tuple& result) {
372 // temp tuple timestamp should be minimum between
373 // minimums of all input queues
375 // find the inputstream in minimum lowebound of the timestamp
376 int qcmp = compare_qts();
377 int minq = 0; if(qcmp>0) minq = 1;
379 host_tuple empty_tuple;
380 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
381 host_tuple& left_tuple = empty_tuple;
382 host_tuple& right_tuple = empty_tuple;
385 if(max_input_ts[minq]) {
386 if (input_queue[minq].empty())
387 left_tuple = max_input_tuple[minq];
389 left_tuple = input_queue[minq].front();
392 if(max_input_ts[minq]) {
393 if (input_queue[minq].empty())
394 right_tuple = max_input_tuple[minq];
396 right_tuple = input_queue[minq].front();
400 result.channel = output_channel;
401 return func.create_temp_status_tuple(left_tuple, right_tuple, result);
405 int get_blocked_status () {
406 if(input_queue[0].size() > soft_queue_size_limit) return(0);
407 if(input_queue[1].size() > soft_queue_size_limit) return(1);
411 unsigned int get_mem_footprint() {
412 return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;
416 #endif // JOIN_EQ_HASH_OPERATOR_H