fd8bf00714112281b6c9e4a70d104aa2fd74bd72
[com/gs-lite.git] / include / hfta / join_eq_hash_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef JOIN_EQ_HASH_OPERATOR_H
17 #define JOIN_EQ_HASH_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include"hash_table.h"
23 using namespace std;
24
25 #include <stdio.h>
26
27 #define JOIN_OP_INNER_JOIN 0
28 #define JOIN_OP_LEFT_OUTER_JOIN 1
29 #define JOIN_OP_RIGHT_OUTER_JOIN 2
30 #define JOIN_OP_OUTER_JOIN 3
31
32
33 #define MAX_TUPLE_SIZE 10240
34
35 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
36 class join_eq_hash_operator : public base_operator {
37 private :
38         //      type of join : inner vs. outer
39         unsigned int join_type;
40 int n_calls, n_iters, n_eqk;
41
42         // for tracing
43         int sch0, sch1;
44
45         // list of tuples from one of the channel waiting to be compared
46         // against tuple from the other channel
47         // Normally at least one should be empty after processing accept_tuple
48         list<host_tuple> input_queue[2];
49
50 //              Admission control timestamp objects
51         timestamp *max_input_ts[2];     // largest timestamp received on this channel
52                                                                 // perhaps from a temporal tuple
53         timestamp *curr_ts;                     // current ts being processed.
54         bool curr_ts_valid;                     // both channels have a ts so curr_ts has been
55                                                                 // assigned a value
56
57         bool hash_empty;                        // always true, seems an artifact
58
59
60 //      max tuples received from input channels
61         char max_input_tuple_data[2][MAX_TUPLE_SIZE];
62         host_tuple max_input_tuple[2];
63
64 //              The hash tables for the join algorithm
65         hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];
66
67
68         // comparator object used to provide methods for performing the joins.
69         join_eq_hash_functor func;
70
71         // soft limit on queue size - we consider operator blocked on its input
72         // whenever we reach this soft limit (not used anymore)
73         size_t soft_queue_size_limit;
74
75 //                      For matching on join hash key
76         equal_func equal_key;
77
78         // memory footprint for the join queues in bytes
79         unsigned int queue_mem;
80
81
82         // appends tuple to the end of the one of the input queues
83         // if tuple is stack resident, makes it heap resident
84         int append_tuple(host_tuple& tup, int q) {
85                 int ret = input_queue[q].empty() ? 1 : 2;
86                 if (!tup.heap_resident) {
87                         char* data = (char*)malloc(tup.tuple_size);
88                         memcpy(data, tup.data, tup.tuple_size);
89                         tup.data = data;
90                         tup.heap_resident = true;
91                 }
92                 input_queue[q].push_back(tup);
93                 queue_mem += tup.tuple_size;
94                 return ret;
95         }
96
97 //              -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller
98         int compare_qts_to_hashts(int i){
99                 timestamp tmp_ts;
100                 if(max_input_ts[i] == NULL) return(-1);
101 //printf("compare_qts_to_hashts channel %d: ",i);
102                 if(input_queue[i].empty())
103                         return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
104                 func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
105                 return(func.compare_ts_with_ts(&tmp_ts, curr_ts));
106         }
107
108 //              -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal
109         int compare_qts(){
110                 if(max_input_ts[0] == NULL) return(-1);
111                 if(max_input_ts[1] == NULL) return(1);
112                 timestamp tmp_lts, tmp_rts, *lts,*rts;
113
114                 if(input_queue[0].empty()){
115                         lts = max_input_ts[0];
116                 }else{
117                         func.load_ts_from_tup(&tmp_lts, input_queue[0].front());
118                         lts = &tmp_lts;
119                 }
120
121                 if(input_queue[1].empty()){
122                         rts = max_input_ts[1];
123                 }else{
124                         func.load_ts_from_tup(&tmp_rts, input_queue[1].front());
125                         rts = &tmp_rts;
126                 }
127
128 //printf("compare_qts : ");
129                 return(func.compare_ts_with_ts(lts,rts));
130         }
131
132         int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
133                 timestamp tmp_ts;
134                 func.load_ts_from_tup(&tmp_ts, tup);
135 //printf("compare_tup_with_ts channel %d: ",tup.channel);
136                 return(func.compare_ts_with_ts(&tmp_ts, ts));
137         }
138
139         void process_join(list<host_tuple>& result){
140           int i;
141           for(i=0;i<2;++i){
142 //printf("\tprocess join channel %d input q len is %lu\n",i, input_queue[i].size());
143                 while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
144 //                              apply tuples to join
145                         int other = 1-i;        // the other channel
146                         bool failed;
147
148 //                                      Get tuple from list
149                         host_tuple qtup = input_queue[i].front();
150                         input_queue[i].pop_front();
151                         queue_mem -= qtup.tuple_size;
152
153 //                                              Put it into its join table
154                         hashkey *qtup_key = func.create_key(qtup,failed); // on heap
155                         if(failed){
156                                 qtup.free_tuple();
157                                 continue;
158                         }
159                         join_tbl[i].insert(qtup_key, qtup);
160
161 //                                              Join with matching tuples in other table.
162
163                         typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);
164                         while(jti != join_tbl[other].end()){
165                                 if(equal_key((*jti).first, qtup_key)){
166                                   host_tuple otup;
167                                   if(i==0)
168                                     otup = func.create_output_tuple( qtup, (*jti).second, failed );
169                                   else
170                                     otup = func.create_output_tuple( (*jti).second, qtup, failed );
171                                   if(!failed){
172                                         otup.channel = output_channel;
173                                         result.push_back(otup);
174                                         qtup_key->touch();
175                                         (*jti).first->touch();
176                                   }
177                                 }
178                                 jti = jti.next();
179                         }
180                 }
181           }
182         }
183
184   void process_outer_join(list<host_tuple>& result){
185         int i;
186         bool failed;
187     host_tuple empty_tuple;
188         empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
189
190 //printf("Processing outer join\n");
191
192         hash_empty = true;
193         typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
194         for(i=0;i<2;++i){
195                 if(!join_tbl[i].empty()){
196                         if(join_type & (i+1)){
197                                 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
198 //              Outer join processing
199                                         if( ! (*jti).first->is_touched() ){
200                                           host_tuple otup;
201                                           if(i==0)
202                                         otup = func.create_output_tuple(  (*jti).second, empty_tuple, failed );
203                                           else
204                                         otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );
205                                           if(!failed){
206                                                 otup.channel = output_channel;
207                                                 result.push_back(otup);
208                                           }
209                                         }
210 //              end outer join processing
211
212                                         delete((*jti).first);
213                                         (*jti).second.free_tuple();
214                                 }
215                         }else{
216                                 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
217                                         delete((*jti).first);
218                                         (*jti).second.free_tuple();
219                                 }
220                         }
221                 }
222                 join_tbl[i].clear(); join_tbl[i].resize();
223         }
224
225   }
226
227 public:
228         join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {
229                 join_type = jtype;
230                 max_input_ts[0] = NULL; max_input_ts[1] = NULL;
231                 max_input_tuple[0].data = max_input_tuple_data[0];
232                 max_input_tuple[1].data = max_input_tuple_data[1];
233
234                 curr_ts =  new timestamp();
235                 curr_ts_valid = false;
236                 hash_empty = true;
237                 soft_queue_size_limit = size_limit;
238
239                 sch0=schema_handle0;
240                 sch1=schema_handle1;
241 n_calls=0; n_iters=0; n_eqk=0;
242
243                 queue_mem = 0;
244         }
245
246         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
247                 bool do_join_update = false;
248                 int i;
249                 bool failed;
250
251 //                      Dummy tuple for outer join processing.
252                 host_tuple empty_tuple;
253                 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
254
255
256                 if (tup.channel > 1) {
257                         gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);
258                         return 0;
259                 }
260
261                 bool is_temp_tuple = func.temp_status_received(tup);
262
263 //                      Ensure that the queue ts is initialized.
264                 if(max_input_ts[tup.channel] == NULL){
265 //printf("Loading channel %d\n",tup.channel);
266                         max_input_ts[tup.channel] = new timestamp();
267                         if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
268                                 tup.free_tuple();
269                                 delete max_input_ts[tup.channel];
270                                 max_input_ts[tup.channel] = NULL;
271                                 return(0);      // can't load ts -- bail out.
272                         }
273
274                         if( max_input_ts[1-tup.channel]){
275                                 int qcmp = compare_qts();
276                                 if(qcmp<=0){
277                                         func.load_ts_from_ts(curr_ts, max_input_ts[0]);
278                                 }else{
279                                         func.load_ts_from_ts(curr_ts, max_input_ts[1]);
280                                 }
281                                 curr_ts_valid = true;
282                         }
283                 }
284
285 // reject "out of order" tuple - silently.
286                 timestamp tup_ts;
287                 if(! func.load_ts_from_tup(&tup_ts,tup)){
288                         tup.free_tuple();
289                         return(0);      // can't load ts -- bail out.
290                 }
291
292 //printf("accept_tuple channel=%d: ",tup.channel);
293             int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
294                 if (tup_order < 0){
295                         printf("%s: out of order ts, channel=%d.\n", op_name, tup.channel);
296                         tup.free_tuple();
297
298                         // even for out of order temporal tuples we need to post new temporal tuple
299                         if (is_temp_tuple) {
300                                 host_tuple temp_tup;
301                                 temp_tup.channel = output_channel;
302                                 if (!get_temp_status(temp_tup))
303                                         result.push_back(temp_tup);
304                         }
305                         return  0;
306                 }
307
308 //      Update max if larger
309                 if(tup_order > 0){
310 //printf("Loading channel %d\n",tup.channel);
311                         func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
312
313                         // save the content of the max tuple
314                         max_input_tuple[tup.channel].channel = tup.channel;
315                         max_input_tuple[tup.channel].tuple_size = tup.tuple_size;
316                         memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);
317
318 //                      do_join_update = true;
319                 }
320
321 //              Add to input queue if it passes the prefilter.
322                 if(!is_temp_tuple && func.apply_prefilter(tup)){
323                         if(append_tuple(tup,tup.channel) == 1){
324                                 do_join_update = true;  // added tuple to empty queue
325                         }
326                 }else{
327                         tup.free_tuple();
328                 }
329
330 //              If status changed, apply tuples to join.
331 //              (updated max time, added tuple to empty queue)
332
333 //              clear queues, advance curr_ts
334                         if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){
335                                 process_outer_join(result);
336
337
338                           int minq = 0;
339                           if(compare_qts() > 0)
340                                 minq = 1;
341                           if(input_queue[minq].empty())
342                                 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
343                           else
344                                 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
345                         }
346
347 //                              Process any tuples to be joined.
348                                         process_join(result);
349
350
351                 // post new temporal tuple
352
353                 if(is_temp_tuple) {
354                         host_tuple temp_tup;
355                         temp_tup.channel = output_channel;
356                         if (!get_temp_status(temp_tup))
357                                 result.push_back(temp_tup);
358                 }
359
360                 return 0;
361         }
362
363         int flush(list<host_tuple>& result) {
364 //printf("Calling flush\n");
365
366                 process_outer_join(result);
367
368                 int minq = 0;
369                 if(compare_qts() > 0)
370                         minq = 1;
371
372                 if(input_queue[minq].empty())
373                         func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
374                 else
375                         func.load_ts_from_tup(curr_ts,input_queue[minq].front());
376
377                 process_join(result);
378
379                 return 0;
380         }
381
382         int set_param_block(int sz, void * value) {
383                 func.set_param_block(sz, value);
384                 return 0;
385         }
386
387
388         int get_temp_status(host_tuple& result) {
389 //                      temp tuple timestamp should be minimum between
390 //                      minimums of all input queues
391
392                 // find the inputstream in minimum lowebound of the timestamp
393                 int qcmp = compare_qts();
394                 int minq = 0; if(qcmp>0) minq = 1;
395
396                 timestamp left_ts, right_ts;
397                 timestamp *left_ts_ptr = &left_ts;
398                 timestamp *right_ts_ptr = &right_ts;
399
400                 
401 /*
402                 if (input_queue[0].empty()){
403                         printf("L=max_ts, ");
404                         left_ts_ptr = max_input_ts[0];
405                 }else{
406                         printf("L=q, ");
407                         func.load_ts_from_tup(left_ts_ptr, input_queue[0].front());
408                 }
409                 
410                 if (input_queue[1].empty()){
411                         printf("R=max_ts, ");
412                         right_ts_ptr = max_input_ts[1];
413                 }else{
414                         printf("L=q, ");
415                         func.load_ts_from_tup(right_ts_ptr, input_queue[1].front());
416                 }
417 */
418                 if(curr_ts_valid){
419                         left_ts_ptr = curr_ts;
420                         right_ts_ptr = curr_ts;
421                 }else{
422 //printf("curr_ts invalid\n");
423                         left_ts_ptr = NULL;
424                         right_ts_ptr = NULL;
425                 }
426
427                 result.channel = output_channel;
428                 return func.create_temp_status_tuple(left_ts_ptr, right_ts_ptr, result);
429         }
430
431
432         int get_blocked_status () {
433                 if(input_queue[0].size() > soft_queue_size_limit) return(0);
434                 if(input_queue[1].size() > soft_queue_size_limit) return(1);
435                 return -1;
436         }
437
438         unsigned int get_mem_footprint() {
439                 return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;
440         }
441 };
442
443 #endif  // JOIN_EQ_HASH_OPERATOR_H
444