1 /* ------------------------------------------------
\r
2 Copyright 2014 AT&T Intellectual Property
\r
3 Licensed under the Apache License, Version 2.0 (the "License");
\r
4 you may not use this file except in compliance with the License.
\r
5 You may obtain a copy of the License at
\r
7 http://www.apache.org/licenses/LICENSE-2.0
\r
9 Unless required by applicable law or agreed to in writing, software
\r
10 distributed under the License is distributed on an "AS IS" BASIS,
\r
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 See the License for the specific language governing permissions and
\r
13 limitations under the License.
\r
14 ------------------------------------------- */
\r
16 #ifndef JOIN_EQ_HASH_OPERATOR_H
\r
17 #define JOIN_EQ_HASH_OPERATOR_H
\r
19 #include "host_tuple.h"
\r
20 #include "base_operator.h"
\r
22 #include"hash_table.h"
\r
23 using namespace std;
\r
27 #define JOIN_OP_INNER_JOIN 0
\r
28 #define JOIN_OP_LEFT_OUTER_JOIN 1
\r
29 #define JOIN_OP_RIGHT_OUTER_JOIN 2
\r
30 #define JOIN_OP_OUTER_JOIN 3
\r
33 #define MAX_TUPLE_SIZE 1024
\r
35 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
\r
36 class join_eq_hash_operator : public base_operator {
\r
38 // type of join : inner vs. outer
\r
39 unsigned int join_type;
\r
40 int n_calls, n_iters, n_eqk;
\r
45 // list of tuples from one of the channel waiting to be compared
\r
46 // against tuple from the other channel
\r
47 list<host_tuple> input_queue[2];
\r
49 // Admission control timestamp objects
\r
50 timestamp *max_input_ts[2], *curr_ts;
\r
51 bool hash_empty, curr_ts_valid;
\r
53 // max tuples received from input channels
\r
54 char max_input_tuple_data[2][MAX_TUPLE_SIZE];
\r
55 host_tuple max_input_tuple[2];
\r
57 // The hash tables for the join algorithm
\r
58 hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];
\r
61 // comparator object used to provide methods for performing the joins.
\r
62 join_eq_hash_functor func;
\r
64 // soft limit on queue size - we consider operator blocked on its input
\r
65 // whenever we reach this soft limit (not used anymore)
\r
66 size_t soft_queue_size_limit;
\r
68 // For matching on join hash key
\r
69 equal_func equal_key;
\r
71 // memory footprint for the join queues in bytes
\r
72 unsigned int queue_mem;
\r
75 // appends tuple to the end of the one of the input queues
\r
76 // if tuple is stack resident, makes it heap resident
\r
77 int append_tuple(host_tuple& tup, int q) {
\r
78 int ret = input_queue[q].empty() ? 1 : 2;
\r
79 if (!tup.heap_resident) {
\r
80 char* data = (char*)malloc(tup.tuple_size);
\r
81 memcpy(data, tup.data, tup.tuple_size);
\r
83 tup.heap_resident = true;
\r
85 input_queue[q].push_back(tup);
\r
86 queue_mem += tup.tuple_size;
\r
90 // -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller
\r
91 int compare_qts_to_hashts(int i){
\r
93 if(max_input_ts[i] == NULL) return(-1);
\r
94 if(input_queue[i].empty())
\r
95 return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
\r
96 func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
\r
97 return(func.compare_ts_with_ts(&tmp_ts, curr_ts));
\r
100 // -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal
\r
102 if(max_input_ts[0] == NULL) return(-1);
\r
103 if(max_input_ts[1] == NULL) return(1);
\r
104 timestamp tmp_lts, tmp_rts, *lts,*rts;
\r
106 if(input_queue[0].empty()){
\r
107 lts = max_input_ts[0];
\r
109 func.load_ts_from_tup(&tmp_lts, input_queue[0].front());
\r
113 if(input_queue[1].empty()){
\r
114 rts = max_input_ts[1];
\r
116 func.load_ts_from_tup(&tmp_rts, input_queue[1].front());
\r
120 return(func.compare_ts_with_ts(lts,rts));
\r
123 int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
\r
125 func.load_ts_from_tup(&tmp_ts, tup);
\r
126 return(func.compare_ts_with_ts(&tmp_ts, ts));
\r
129 void process_join(list<host_tuple>& result){
\r
132 while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
\r
133 // apply tuples to join
\r
134 int other = 1-i; // the other channel
\r
137 // Get tuple from list
\r
138 host_tuple qtup = input_queue[i].front();
\r
139 input_queue[i].pop_front();
\r
140 queue_mem -= qtup.tuple_size;
\r
142 // Put it into its join table
\r
143 hashkey *qtup_key = func.create_key(qtup,failed); // on heap
\r
148 join_tbl[i].insert(qtup_key, qtup);
\r
150 // Join with matching tuples in other table.
\r
152 typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);
\r
153 while(jti != join_tbl[other].end()){
\r
154 if(equal_key((*jti).first, qtup_key)){
\r
157 otup = func.create_output_tuple( qtup, (*jti).second, failed );
\r
159 otup = func.create_output_tuple( (*jti).second, qtup, failed );
\r
161 otup.channel = output_channel;
\r
162 result.push_back(otup);
\r
164 (*jti).first->touch();
\r
173 void process_outer_join(list<host_tuple>& result){
\r
176 host_tuple empty_tuple;
\r
177 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
\r
180 typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
\r
182 if(!join_tbl[i].empty()){
\r
183 if(join_type & (i+1)){
\r
184 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
\r
185 // Outer join processing
\r
186 if( ! (*jti).first->is_touched() ){
\r
189 otup = func.create_output_tuple( (*jti).second, empty_tuple, failed );
\r
191 otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );
\r
193 otup.channel = output_channel;
\r
194 result.push_back(otup);
\r
197 // end outer join processing
\r
199 delete((*jti).first);
\r
200 (*jti).second.free_tuple();
\r
203 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
\r
204 delete((*jti).first);
\r
205 (*jti).second.free_tuple();
\r
209 join_tbl[i].clear(); join_tbl[i].rehash();
\r
215 join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {
\r
217 max_input_ts[0] = NULL; max_input_ts[1] = NULL;
\r
218 max_input_tuple[0].data = max_input_tuple_data[0];
\r
219 max_input_tuple[1].data = max_input_tuple_data[1];
\r
221 curr_ts = new timestamp();
\r
222 curr_ts_valid = false;
\r
224 soft_queue_size_limit = size_limit;
\r
226 sch0=schema_handle0;
\r
227 sch1=schema_handle1;
\r
228 n_calls=0; n_iters=0; n_eqk=0;
\r
233 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
\r
234 bool do_join_update = false;
\r
238 // Dummy tuple for outer join processing.
\r
239 host_tuple empty_tuple;
\r
240 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
\r
243 if (tup.channel > 1) {
\r
244 gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);
\r
248 bool is_temp_tuple = func.temp_status_received(tup);
\r
250 // Ensure that the queue ts is initialized.
\r
251 if(max_input_ts[tup.channel] == NULL){
\r
252 max_input_ts[tup.channel] = new timestamp();
\r
253 if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
\r
255 delete max_input_ts[tup.channel];
\r
256 max_input_ts[tup.channel] = NULL;
\r
257 return(0); // can't load ts -- bail out.
\r
260 if( max_input_ts[1-tup.channel]){
\r
261 int qcmp = compare_qts();
\r
263 func.load_ts_from_ts(curr_ts, max_input_ts[0]);
\r
265 func.load_ts_from_ts(curr_ts, max_input_ts[1]);
\r
267 curr_ts_valid = true;
\r
271 // reject "out of order" tuple - silently.
\r
273 if(! func.load_ts_from_tup(&tup_ts,tup)){
\r
275 return(0); // can't load ts -- bail out.
\r
278 int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
\r
279 if (tup_order < 0){
\r
280 printf("out of order ts.\n");
\r
283 // even for out of order temporal tuples we need to post new temporal tuple
\r
284 if (is_temp_tuple) {
\r
285 host_tuple temp_tup;
\r
286 temp_tup.channel = output_channel;
\r
287 if (!get_temp_status(temp_tup))
\r
288 result.push_back(temp_tup);
\r
293 // Update max if larger
\r
295 func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
\r
297 // save the content of the max tuple
\r
298 max_input_tuple[tup.channel].channel = tup.channel;
\r
299 max_input_tuple[tup.channel].tuple_size = tup.tuple_size;
\r
300 memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);
\r
302 // do_join_update = true;
\r
305 // Add to input queue if it passes the prefilter.
\r
306 if(!is_temp_tuple && func.apply_prefilter(tup)){
\r
307 if(append_tuple(tup,tup.channel) == 1){
\r
308 do_join_update = true; // added tuple to empty queue
\r
314 // If status changed, apply tuples to join.
\r
315 // (updated max time, added tuple to empty queue)
\r
317 // clear queues, advance curr_ts
\r
318 if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){
\r
319 process_outer_join(result);
\r
323 if(compare_qts() > 0)
\r
325 if(input_queue[minq].empty())
\r
326 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
\r
328 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
\r
331 // Process any tuples to be joined.
\r
332 process_join(result);
\r
335 // post new temporal tuple
\r
337 if(is_temp_tuple) {
\r
338 host_tuple temp_tup;
\r
339 temp_tup.channel = output_channel;
\r
340 if (!get_temp_status(temp_tup))
\r
341 result.push_back(temp_tup);
\r
347 int flush(list<host_tuple>& result) {
\r
349 process_outer_join(result);
\r
352 if(compare_qts() > 0)
\r
355 if(input_queue[minq].empty())
\r
356 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
\r
358 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
\r
360 process_join(result);
\r
365 int set_param_block(int sz, void * value) {
\r
366 func.set_param_block(sz, value);
\r
371 int get_temp_status(host_tuple& result) {
\r
372 // temp tuple timestamp should be minimum between
\r
373 // minimums of all input queues
\r
375 // find the inputstream in minimum lowebound of the timestamp
\r
376 int qcmp = compare_qts();
\r
377 int minq = 0; if(qcmp>0) minq = 1;
\r
379 host_tuple empty_tuple;
\r
380 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
\r
381 host_tuple& left_tuple = empty_tuple;
\r
382 host_tuple& right_tuple = empty_tuple;
\r
385 if(max_input_ts[minq]) {
\r
386 if (input_queue[minq].empty())
\r
387 left_tuple = max_input_tuple[minq];
\r
389 left_tuple = input_queue[minq].front();
\r
392 if(max_input_ts[minq]) {
\r
393 if (input_queue[minq].empty())
\r
394 right_tuple = max_input_tuple[minq];
\r
396 right_tuple = input_queue[minq].front();
\r
400 result.channel = output_channel;
\r
401 return func.create_temp_status_tuple(left_tuple, right_tuple, result);
\r
405 int get_blocked_status () {
\r
406 if(input_queue[0].size() > soft_queue_size_limit) return(0);
\r
407 if(input_queue[1].size() > soft_queue_size_limit) return(1);
\r
411 unsigned int get_mem_footprint() {
\r
412 return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;
\r
416 #endif // JOIN_EQ_HASH_OPERATOR_H
\r