1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef JOIN_EQ_HASH_OPERATOR_H
17 #define JOIN_EQ_HASH_OPERATOR_H
19 #include "host_tuple.h"
20 #include "base_operator.h"
22 #include"hash_table.h"
27 #define JOIN_OP_INNER_JOIN 0
28 #define JOIN_OP_LEFT_OUTER_JOIN 1
29 #define JOIN_OP_RIGHT_OUTER_JOIN 2
30 #define JOIN_OP_OUTER_JOIN 3
33 #define MAX_TUPLE_SIZE 10240
35 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
36 class join_eq_hash_operator : public base_operator {
38 // type of join : inner vs. outer
39 unsigned int join_type;
40 int n_calls, n_iters, n_eqk;
45 // list of tuples from one of the channel waiting to be compared
46 // against tuple from the other channel
47 // Normally at least one should be empty after processing accept_tuple
48 list<host_tuple> input_queue[2];
50 // Admission control timestamp objects
51 timestamp *max_input_ts[2]; // largest timestamp received on this channel
52 // perhaps from a temporal tuple
53 timestamp *curr_ts; // current ts being processed.
54 bool curr_ts_valid; // both channels have a ts so curr_ts has been
57 bool hash_empty; // always true, seems an artifact
60 // max tuples received from input channels
61 char max_input_tuple_data[2][MAX_TUPLE_SIZE];
62 host_tuple max_input_tuple[2];
64 // The hash tables for the join algorithm
65 hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];
68 // comparator object used to provide methods for performing the joins.
69 join_eq_hash_functor func;
71 // soft limit on queue size - we consider operator blocked on its input
72 // whenever we reach this soft limit (not used anymore)
73 size_t soft_queue_size_limit;
75 // For matching on join hash key
78 // memory footprint for the join queues in bytes
79 unsigned int queue_mem;
82 // appends tuple to the end of the one of the input queues
83 // if tuple is stack resident, makes it heap resident
84 int append_tuple(host_tuple& tup, int q) {
85 int ret = input_queue[q].empty() ? 1 : 2;
86 if (!tup.heap_resident) {
87 char* data = (char*)malloc(tup.tuple_size);
88 memcpy(data, tup.data, tup.tuple_size);
90 tup.heap_resident = true;
92 input_queue[q].push_back(tup);
93 queue_mem += tup.tuple_size;
97 // -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller
98 int compare_qts_to_hashts(int i){
100 if(max_input_ts[i] == NULL) return(-1);
101 //printf("compare_qts_to_hashts channel %d: ",i);
102 if(input_queue[i].empty())
103 return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
104 func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
105 return(func.compare_ts_with_ts(&tmp_ts, curr_ts));
108 // -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal
110 if(max_input_ts[0] == NULL) return(-1);
111 if(max_input_ts[1] == NULL) return(1);
112 timestamp tmp_lts, tmp_rts, *lts,*rts;
114 if(input_queue[0].empty()){
115 lts = max_input_ts[0];
117 func.load_ts_from_tup(&tmp_lts, input_queue[0].front());
121 if(input_queue[1].empty()){
122 rts = max_input_ts[1];
124 func.load_ts_from_tup(&tmp_rts, input_queue[1].front());
128 //printf("compare_qts : ");
129 return(func.compare_ts_with_ts(lts,rts));
132 int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
134 func.load_ts_from_tup(&tmp_ts, tup);
135 //printf("compare_tup_with_ts channel %d: ",tup.channel);
136 return(func.compare_ts_with_ts(&tmp_ts, ts));
139 void process_join(list<host_tuple>& result){
142 //printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
143 while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
144 // apply tuples to join
145 int other = 1-i; // the other channel
148 // Get tuple from list
149 host_tuple qtup = input_queue[i].front();
150 input_queue[i].pop_front();
151 queue_mem -= qtup.tuple_size;
153 // Put it into its join table
154 hashkey *qtup_key = func.create_key(qtup,failed); // on heap
159 join_tbl[i].insert(qtup_key, qtup);
161 // Join with matching tuples in other table.
163 typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);
164 while(jti != join_tbl[other].end()){
165 if(equal_key((*jti).first, qtup_key)){
168 otup = func.create_output_tuple( qtup, (*jti).second, failed );
170 otup = func.create_output_tuple( (*jti).second, qtup, failed );
172 otup.channel = output_channel;
173 result.push_back(otup);
175 (*jti).first->touch();
184 void process_outer_join(list<host_tuple>& result){
187 host_tuple empty_tuple;
188 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
190 //printf("Processing outer join\n");
193 typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
195 if(!join_tbl[i].empty()){
196 if(join_type & (i+1)){
197 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
198 // Outer join processing
199 if( ! (*jti).first->is_touched() ){
202 otup = func.create_output_tuple( (*jti).second, empty_tuple, failed );
204 otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );
206 otup.channel = output_channel;
207 result.push_back(otup);
210 // end outer join processing
212 delete((*jti).first);
213 (*jti).second.free_tuple();
216 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
217 delete((*jti).first);
218 (*jti).second.free_tuple();
222 join_tbl[i].clear(); join_tbl[i].resize();
228 join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {
230 max_input_ts[0] = NULL; max_input_ts[1] = NULL;
231 max_input_tuple[0].data = max_input_tuple_data[0];
232 max_input_tuple[1].data = max_input_tuple_data[1];
234 curr_ts = new timestamp();
235 curr_ts_valid = false;
237 soft_queue_size_limit = size_limit;
241 n_calls=0; n_iters=0; n_eqk=0;
246 int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
247 bool do_join_update = false;
251 // Dummy tuple for outer join processing.
252 host_tuple empty_tuple;
253 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
256 if (tup.channel > 1) {
257 gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);
261 bool is_temp_tuple = func.temp_status_received(tup);
263 // Ensure that the queue ts is initialized.
264 if(max_input_ts[tup.channel] == NULL){
265 //printf("Loading channel %d\n",tup.channel);
266 max_input_ts[tup.channel] = new timestamp();
267 if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
269 delete max_input_ts[tup.channel];
270 max_input_ts[tup.channel] = NULL;
271 return(0); // can't load ts -- bail out.
274 if( max_input_ts[1-tup.channel]){
275 int qcmp = compare_qts();
277 func.load_ts_from_ts(curr_ts, max_input_ts[0]);
279 func.load_ts_from_ts(curr_ts, max_input_ts[1]);
281 curr_ts_valid = true;
285 // reject "out of order" tuple - silently.
287 if(! func.load_ts_from_tup(&tup_ts,tup)){
289 return(0); // can't load ts -- bail out.
292 //printf("accept_tuple channel=%d: ",tup.channel);
293 int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
295 // printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
298 // even for out of order temporal tuples we need to post new temporal tuple
301 temp_tup.channel = output_channel;
302 if (!get_temp_status(temp_tup))
303 result.push_back(temp_tup);
308 // Update max if larger
310 //printf("Loading channel %d\n",tup.channel);
311 func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
313 // save the content of the max tuple
314 max_input_tuple[tup.channel].channel = tup.channel;
315 max_input_tuple[tup.channel].tuple_size = tup.tuple_size;
316 memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);
318 // do_join_update = true;
321 // Add to input queue if it passes the prefilter.
322 if(!is_temp_tuple && func.apply_prefilter(tup)){
323 if(append_tuple(tup,tup.channel) == 1){
324 do_join_update = true; // added tuple to empty queue
330 // If status changed, apply tuples to join.
331 // (updated max time, added tuple to empty queue)
333 // clear queues, advance curr_ts
334 if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){
335 process_outer_join(result);
339 if(compare_qts() > 0)
341 if(input_queue[minq].empty())
342 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
344 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
347 // Process any tuples to be joined.
348 process_join(result);
351 // post new temporal tuple
355 temp_tup.channel = output_channel;
356 if (!get_temp_status(temp_tup))
357 result.push_back(temp_tup);
363 int flush(list<host_tuple>& result) {
364 //printf("Calling flush\n");
366 process_outer_join(result);
369 if(compare_qts() > 0)
372 if(input_queue[minq].empty())
373 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
375 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
377 process_join(result);
382 int set_param_block(int sz, void * value) {
383 func.set_param_block(sz, value);
388 int get_temp_status(host_tuple& result) {
389 // temp tuple timestamp should be minimum between
390 // minimums of all input queues
392 // find the inputstream in minimum lowebound of the timestamp
393 int qcmp = compare_qts();
394 int minq = 0; if(qcmp>0) minq = 1;
396 timestamp left_ts, right_ts;
397 timestamp *left_ts_ptr = &left_ts;
398 timestamp *right_ts_ptr = &right_ts;
402 if (input_queue[0].empty()){
403 printf("L=max_ts, ");
404 left_ts_ptr = max_input_ts[0];
407 func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
410 if (input_queue[1].empty()){
411 printf("R=max_ts, ");
412 right_ts_ptr = max_input_ts[1];
415 func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
419 left_ts_ptr = curr_ts;
420 right_ts_ptr = curr_ts;
422 //printf("curr_ts invalid\n");
427 result.channel = output_channel;
428 return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
432 int get_blocked_status () {
433 if(input_queue[0].size() > soft_queue_size_limit) return(0);
434 if(input_queue[1].size() > soft_queue_size_limit) return(1);
438 unsigned int get_mem_footprint() {
439 return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;
443 #endif // JOIN_EQ_HASH_OPERATOR_H