Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / join_eq_hash_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef JOIN_EQ_HASH_OPERATOR_H
17 #define JOIN_EQ_HASH_OPERATOR_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 #include"hash_table.h"
23 using namespace std;
24
25 #include <stdio.h>
26
27 #define JOIN_OP_INNER_JOIN 0
28 #define JOIN_OP_LEFT_OUTER_JOIN 1
29 #define JOIN_OP_RIGHT_OUTER_JOIN 2
30 #define JOIN_OP_OUTER_JOIN 3
31
32
33 #define MAX_TUPLE_SIZE 1024
34
35 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>
36 class join_eq_hash_operator : public base_operator {
37 private :
38         //      type of join : inner vs. outer
39         unsigned int join_type;
40 int n_calls, n_iters, n_eqk;
41
42         // for tracing
43         int sch0, sch1;
44
45         // list of tuples from one of the channel waiting to be compared
46         // against tuple from the other channel
47         list<host_tuple> input_queue[2];
48
49 //              Admission control timestamp objects
50         timestamp *max_input_ts[2], *curr_ts;
51         bool hash_empty, curr_ts_valid;
52
53 //      max tuples received from input channels
54         char max_input_tuple_data[2][MAX_TUPLE_SIZE];
55         host_tuple max_input_tuple[2];
56
57 //              The hash tables for the join algorithm
58         hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];
59
60
61         // comparator object used to provide methods for performing the joins.
62         join_eq_hash_functor func;
63
64         // soft limit on queue size - we consider operator blocked on its input
65         // whenever we reach this soft limit (not used anymore)
66         size_t soft_queue_size_limit;
67
68 //                      For matching on join hash key
69         equal_func equal_key;
70
71         // memory footprint for the join queues in bytes
72         unsigned int queue_mem;
73
74
75         // appends tuple to the end of the one of the input queues
76         // if tuple is stack resident, makes it heap resident
77         int append_tuple(host_tuple& tup, int q) {
78                 int ret = input_queue[q].empty() ? 1 : 2;
79                 if (!tup.heap_resident) {
80                         char* data = (char*)malloc(tup.tuple_size);
81                         memcpy(data, tup.data, tup.tuple_size);
82                         tup.data = data;
83                         tup.heap_resident = true;
84                 }
85                 input_queue[q].push_back(tup);
86                 queue_mem += tup.tuple_size;
87                 return ret;
88         }
89
90 //              -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller
91         int compare_qts_to_hashts(int i){
92                 timestamp tmp_ts;
93                 if(max_input_ts[i] == NULL) return(-1);
94                 if(input_queue[i].empty())
95                         return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));
96                 func.load_ts_from_tup(&tmp_ts,input_queue[i].front());
97                 return(func.compare_ts_with_ts(&tmp_ts, curr_ts));
98         }
99
100 //              -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal
101         int compare_qts(){
102                 if(max_input_ts[0] == NULL) return(-1);
103                 if(max_input_ts[1] == NULL) return(1);
104                 timestamp tmp_lts, tmp_rts, *lts,*rts;
105
106                 if(input_queue[0].empty()){
107                         lts = max_input_ts[0];
108                 }else{
109                         func.load_ts_from_tup(&tmp_lts, input_queue[0].front());
110                         lts = &tmp_lts;
111                 }
112
113                 if(input_queue[1].empty()){
114                         rts = max_input_ts[1];
115                 }else{
116                         func.load_ts_from_tup(&tmp_rts, input_queue[1].front());
117                         rts = &tmp_rts;
118                 }
119
120                 return(func.compare_ts_with_ts(lts,rts));
121         }
122
123         int compare_tup_with_ts(host_tuple &tup, timestamp *ts){
124                 timestamp tmp_ts;
125                 func.load_ts_from_tup(&tmp_ts, tup);
126                 return(func.compare_ts_with_ts(&tmp_ts, ts));
127         }
128
129         void process_join(list<host_tuple>& result){
130           int i;
131           for(i=0;i<2;++i){
132                 while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){
133 //                              apply tuples to join
134                         int other = 1-i;        // the other channel
135                         bool failed;
136
137 //                                      Get tuple from list
138                         host_tuple qtup = input_queue[i].front();
139                         input_queue[i].pop_front();
140                         queue_mem -= qtup.tuple_size;
141
142 //                                              Put it into its join table
143                         hashkey *qtup_key = func.create_key(qtup,failed); // on heap
144                         if(failed){
145                                 qtup.free_tuple();
146                                 continue;
147                         }
148                         join_tbl[i].insert(qtup_key, qtup);
149
150 //                                              Join with matching tuples in other table.
151
152                         typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);
153                         while(jti != join_tbl[other].end()){
154                                 if(equal_key((*jti).first, qtup_key)){
155                                   host_tuple otup;
156                                   if(i==0)
157                                     otup = func.create_output_tuple( qtup, (*jti).second, failed );
158                                   else
159                                     otup = func.create_output_tuple( (*jti).second, qtup, failed );
160                                   if(!failed){
161                                         otup.channel = output_channel;
162                                         result.push_back(otup);
163                                         qtup_key->touch();
164                                         (*jti).first->touch();
165                                   }
166                                 }
167                                 jti = jti.next();
168                         }
169                 }
170           }
171         }
172
173   void process_outer_join(list<host_tuple>& result){
174         int i;
175         bool failed;
176     host_tuple empty_tuple;
177         empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
178
179         hash_empty = true;
180         typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;
181         for(i=0;i<2;++i){
182                 if(!join_tbl[i].empty()){
183                         if(join_type & (i+1)){
184                                 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
185 //              Outer join processing
186                                         if( ! (*jti).first->is_touched() ){
187                                           host_tuple otup;
188                                           if(i==0)
189                                         otup = func.create_output_tuple(  (*jti).second, empty_tuple, failed );
190                                           else
191                                         otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );
192                                           if(!failed){
193                                                 otup.channel = output_channel;
194                                                 result.push_back(otup);
195                                           }
196                                         }
197 //              end outer join processing
198
199                                         delete((*jti).first);
200                                         (*jti).second.free_tuple();
201                                 }
202                         }else{
203                                 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){
204                                         delete((*jti).first);
205                                         (*jti).second.free_tuple();
206                                 }
207                         }
208                 }
209                 join_tbl[i].clear(); join_tbl[i].rehash();
210         }
211
212   }
213
214 public:
215         join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {
216                 join_type = jtype;
217                 max_input_ts[0] = NULL; max_input_ts[1] = NULL;
218                 max_input_tuple[0].data = max_input_tuple_data[0];
219                 max_input_tuple[1].data = max_input_tuple_data[1];
220
221                 curr_ts =  new timestamp();
222                 curr_ts_valid = false;
223                 hash_empty = true;
224                 soft_queue_size_limit = size_limit;
225
226                 sch0=schema_handle0;
227                 sch1=schema_handle1;
228 n_calls=0; n_iters=0; n_eqk=0;
229
230                 queue_mem = 0;
231         }
232
233         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
234                 bool do_join_update = false;
235                 int i;
236                 bool failed;
237
238 //                      Dummy tuple for outer join processing.
239                 host_tuple empty_tuple;
240                 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
241
242
243                 if (tup.channel > 1) {
244                         gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);
245                         return 0;
246                 }
247
248                 bool is_temp_tuple = func.temp_status_received(tup);
249
250 //                      Ensure that the queue ts is initialized.
251                 if(max_input_ts[tup.channel] == NULL){
252                         max_input_ts[tup.channel] = new timestamp();
253                         if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){
254                                 tup.free_tuple();
255                                 delete max_input_ts[tup.channel];
256                                 max_input_ts[tup.channel] = NULL;
257                                 return(0);      // can't load ts -- bail out.
258                         }
259
260                         if( max_input_ts[1-tup.channel]){
261                                 int qcmp = compare_qts();
262                                 if(qcmp<=0){
263                                         func.load_ts_from_ts(curr_ts, max_input_ts[0]);
264                                 }else{
265                                         func.load_ts_from_ts(curr_ts, max_input_ts[1]);
266                                 }
267                                 curr_ts_valid = true;
268                         }
269                 }
270
271 // reject "out of order" tuple - silently.
272                 timestamp tup_ts;
273                 if(! func.load_ts_from_tup(&tup_ts,tup)){
274                         tup.free_tuple();
275                         return(0);      // can't load ts -- bail out.
276                 }
277
278             int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);
279                 if (tup_order < 0){
280 printf("out of order ts.\n");
281                         tup.free_tuple();
282
283                         // even for out of order temporal tuples we need to post new temporal tuple
284                         if (is_temp_tuple) {
285                                 host_tuple temp_tup;
286                                 temp_tup.channel = output_channel;
287                                 if (!get_temp_status(temp_tup))
288                                         result.push_back(temp_tup);
289                         }
290                         return  0;
291                 }
292
293 //      Update max if larger
294                 if(tup_order > 0){
295                         func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);
296
297                         // save the content of the max tuple
298                         max_input_tuple[tup.channel].channel = tup.channel;
299                         max_input_tuple[tup.channel].tuple_size = tup.tuple_size;
300                         memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);
301
302 //                      do_join_update = true;
303                 }
304
305 //              Add to input queue if it passes the prefilter.
306                 if(!is_temp_tuple && func.apply_prefilter(tup)){
307                         if(append_tuple(tup,tup.channel) == 1){
308                                 do_join_update = true;  // added tuple to empty queue
309                         }
310                 }else{
311                         tup.free_tuple();
312                 }
313
314 //              If status changed, apply tuples to join.
315 //              (updated max time, added tuple to empty queue)
316
317 //              clear queues, advance curr_ts
318                         if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){
319                                 process_outer_join(result);
320
321
322                           int minq = 0;
323                           if(compare_qts() > 0)
324                                 minq = 1;
325                           if(input_queue[minq].empty())
326                                 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
327                           else
328                                 func.load_ts_from_tup(curr_ts,input_queue[minq].front());
329                         }
330
331 //                              Process any tuples to be joined.
332                                         process_join(result);
333
334
335                 // post new temporal tuple
336
337                 if(is_temp_tuple) {
338                         host_tuple temp_tup;
339                         temp_tup.channel = output_channel;
340                         if (!get_temp_status(temp_tup))
341                                 result.push_back(temp_tup);
342                 }
343
344                 return 0;
345         }
346
347         int flush(list<host_tuple>& result) {
348
349                 process_outer_join(result);
350
351                 int minq = 0;
352                 if(compare_qts() > 0)
353                         minq = 1;
354
355                 if(input_queue[minq].empty())
356                         func.load_ts_from_ts(curr_ts,max_input_ts[minq]);
357                 else
358                         func.load_ts_from_tup(curr_ts,input_queue[minq].front());
359
360                 process_join(result);
361
362                 return 0;
363         }
364
365         int set_param_block(int sz, void * value) {
366                 func.set_param_block(sz, value);
367                 return 0;
368         }
369
370
371         int get_temp_status(host_tuple& result) {
372 //                      temp tuple timestamp should be minimum between
373 //                      minimums of all input queues
374
375                 // find the inputstream in minimum lowebound of the timestamp
376                 int qcmp = compare_qts();
377                 int minq = 0; if(qcmp>0) minq = 1;
378
379                 host_tuple empty_tuple;
380                 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;
381                 host_tuple& left_tuple = empty_tuple;
382                 host_tuple& right_tuple = empty_tuple;
383
384                 if (minq == 0) {
385                         if(max_input_ts[minq]) {
386                                 if (input_queue[minq].empty())
387                                         left_tuple = max_input_tuple[minq];
388                                 else
389                                         left_tuple = input_queue[minq].front();
390                         }
391                 } else {
392                         if(max_input_ts[minq]) {
393                                 if (input_queue[minq].empty())
394                                         right_tuple = max_input_tuple[minq];
395                                 else
396                                         right_tuple = input_queue[minq].front();
397                         }
398                 }
399
400                 result.channel = output_channel;
401                 return func.create_temp_status_tuple(left_tuple, right_tuple, result);
402         }
403
404
405         int get_blocked_status () {
406                 if(input_queue[0].size() > soft_queue_size_limit) return(0);
407                 if(input_queue[1].size() > soft_queue_size_limit) return(1);
408                 return -1;
409         }
410
411         unsigned int get_mem_footprint() {
412                 return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;
413         }
414 };
415
416 #endif  // JOIN_EQ_HASH_OPERATOR_H
417