Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / join_eq_hash_operator.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef JOIN_EQ_HASH_OPERATOR_H\r
17 #define JOIN_EQ_HASH_OPERATOR_H\r
18 \r
19 #include "host_tuple.h"\r
20 #include "base_operator.h"\r
21 #include <list>\r
22 #include"hash_table.h"\r
23 using namespace std;\r
24 \r
25 #include <stdio.h>\r
26 \r
27 #define JOIN_OP_INNER_JOIN 0\r
28 #define JOIN_OP_LEFT_OUTER_JOIN 1\r
29 #define JOIN_OP_RIGHT_OUTER_JOIN 2\r
30 #define JOIN_OP_OUTER_JOIN 3\r
31 \r
32 \r
33 #define MAX_TUPLE_SIZE 1024\r
34 \r
35 template <class join_eq_hash_functor, class timestamp, class hashkey, class hasher_func, class equal_func>\r
36 class join_eq_hash_operator : public base_operator {\r
37 private :\r
38         //      type of join : inner vs. outer\r
39         unsigned int join_type;\r
40 int n_calls, n_iters, n_eqk;\r
41 \r
42         // for tracing\r
43         int sch0, sch1;\r
44 \r
45         // list of tuples from one of the channel waiting to be compared\r
46         // against tuple from the other channel\r
47         list<host_tuple> input_queue[2];\r
48 \r
49 //              Admission control timestamp objects\r
50         timestamp *max_input_ts[2], *curr_ts;\r
51         bool hash_empty, curr_ts_valid;\r
52 \r
53 //      max tuples received from input channels\r
54         char max_input_tuple_data[2][MAX_TUPLE_SIZE];\r
55         host_tuple max_input_tuple[2];\r
56 \r
57 //              The hash tables for the join algorithm\r
58         hash_table<hashkey*, host_tuple, hasher_func, equal_func> join_tbl[2];\r
59 \r
60 \r
61         // comparator object used to provide methods for performing the joins.\r
62         join_eq_hash_functor func;\r
63 \r
64         // soft limit on queue size - we consider operator blocked on its input\r
65         // whenever we reach this soft limit (not used anymore)\r
66         size_t soft_queue_size_limit;\r
67 \r
68 //                      For matching on join hash key\r
69         equal_func equal_key;\r
70 \r
71         // memory footprint for the join queues in bytes\r
72         unsigned int queue_mem;\r
73 \r
74 \r
75         // appends tuple to the end of the one of the input queues\r
76         // if tuple is stack resident, makes it heap resident\r
77         int append_tuple(host_tuple& tup, int q) {\r
78                 int ret = input_queue[q].empty() ? 1 : 2;\r
79                 if (!tup.heap_resident) {\r
80                         char* data = (char*)malloc(tup.tuple_size);\r
81                         memcpy(data, tup.data, tup.tuple_size);\r
82                         tup.data = data;\r
83                         tup.heap_resident = true;\r
84                 }\r
85                 input_queue[q].push_back(tup);\r
86                 queue_mem += tup.tuple_size;\r
87                 return ret;\r
88         }\r
89 \r
90 //              -1 if input queue i has smaller ts, 0 it equal, 1 if curr_ts is smaller\r
91         int compare_qts_to_hashts(int i){\r
92                 timestamp tmp_ts;\r
93                 if(max_input_ts[i] == NULL) return(-1);\r
94                 if(input_queue[i].empty())\r
95                         return(func.compare_ts_with_ts(max_input_ts[i], curr_ts));\r
96                 func.load_ts_from_tup(&tmp_ts,input_queue[i].front());\r
97                 return(func.compare_ts_with_ts(&tmp_ts, curr_ts));\r
98         }\r
99 \r
100 //              -1 if q0 is smaller, 1 if q1 is smaller, 0 if equal\r
101         int compare_qts(){\r
102                 if(max_input_ts[0] == NULL) return(-1);\r
103                 if(max_input_ts[1] == NULL) return(1);\r
104                 timestamp tmp_lts, tmp_rts, *lts,*rts;\r
105 \r
106                 if(input_queue[0].empty()){\r
107                         lts = max_input_ts[0];\r
108                 }else{\r
109                         func.load_ts_from_tup(&tmp_lts, input_queue[0].front());\r
110                         lts = &tmp_lts;\r
111                 }\r
112 \r
113                 if(input_queue[1].empty()){\r
114                         rts = max_input_ts[1];\r
115                 }else{\r
116                         func.load_ts_from_tup(&tmp_rts, input_queue[1].front());\r
117                         rts = &tmp_rts;\r
118                 }\r
119 \r
120                 return(func.compare_ts_with_ts(lts,rts));\r
121         }\r
122 \r
123         int compare_tup_with_ts(host_tuple &tup, timestamp *ts){\r
124                 timestamp tmp_ts;\r
125                 func.load_ts_from_tup(&tmp_ts, tup);\r
126                 return(func.compare_ts_with_ts(&tmp_ts, ts));\r
127         }\r
128 \r
129         void process_join(list<host_tuple>& result){\r
130           int i;\r
131           for(i=0;i<2;++i){\r
132                 while(curr_ts_valid && !input_queue[i].empty() && compare_tup_with_ts(input_queue[i].front(), curr_ts) == 0 ){\r
133 //                              apply tuples to join\r
134                         int other = 1-i;        // the other channel\r
135                         bool failed;\r
136 \r
137 //                                      Get tuple from list\r
138                         host_tuple qtup = input_queue[i].front();\r
139                         input_queue[i].pop_front();\r
140                         queue_mem -= qtup.tuple_size;\r
141 \r
142 //                                              Put it into its join table\r
143                         hashkey *qtup_key = func.create_key(qtup,failed); // on heap\r
144                         if(failed){\r
145                                 qtup.free_tuple();\r
146                                 continue;\r
147                         }\r
148                         join_tbl[i].insert(qtup_key, qtup);\r
149 \r
150 //                                              Join with matching tuples in other table.\r
151 \r
152                         typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti = join_tbl[other].find(qtup_key);\r
153                         while(jti != join_tbl[other].end()){\r
154                                 if(equal_key((*jti).first, qtup_key)){\r
155                                   host_tuple otup;\r
156                                   if(i==0)\r
157                                     otup = func.create_output_tuple( qtup, (*jti).second, failed );\r
158                                   else\r
159                                     otup = func.create_output_tuple( (*jti).second, qtup, failed );\r
160                                   if(!failed){\r
161                                         otup.channel = output_channel;\r
162                                         result.push_back(otup);\r
163                                         qtup_key->touch();\r
164                                         (*jti).first->touch();\r
165                                   }\r
166                                 }\r
167                                 jti = jti.next();\r
168                         }\r
169                 }\r
170           }\r
171         }\r
172 \r
173   void process_outer_join(list<host_tuple>& result){\r
174         int i;\r
175         bool failed;\r
176     host_tuple empty_tuple;\r
177         empty_tuple.tuple_size = 0; empty_tuple.data = NULL;\r
178 \r
179         hash_empty = true;\r
180         typename hash_table<hashkey*, host_tuple, hasher_func, equal_func>::iterator jti;\r
181         for(i=0;i<2;++i){\r
182                 if(!join_tbl[i].empty()){\r
183                         if(join_type & (i+1)){\r
184                                 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){\r
185 //              Outer join processing\r
186                                         if( ! (*jti).first->is_touched() ){\r
187                                           host_tuple otup;\r
188                                           if(i==0)\r
189                                         otup = func.create_output_tuple(  (*jti).second, empty_tuple, failed );\r
190                                           else\r
191                                         otup = func.create_output_tuple( empty_tuple, (*jti).second, failed );\r
192                                           if(!failed){\r
193                                                 otup.channel = output_channel;\r
194                                                 result.push_back(otup);\r
195                                           }\r
196                                         }\r
197 //              end outer join processing\r
198 \r
199                                         delete((*jti).first);\r
200                                         (*jti).second.free_tuple();\r
201                                 }\r
202                         }else{\r
203                                 for(jti=join_tbl[i].begin();jti!=join_tbl[i].end();++jti){\r
204                                         delete((*jti).first);\r
205                                         (*jti).second.free_tuple();\r
206                                 }\r
207                         }\r
208                 }\r
209                 join_tbl[i].clear(); join_tbl[i].rehash();\r
210         }\r
211 \r
212   }\r
213 \r
214 public:\r
215         join_eq_hash_operator(int schema_handle0, int schema_handle1, unsigned int jtype, const char* name, size_t size_limit = 10000) : base_operator(name), func(schema_handle0, schema_handle1) {\r
216                 join_type = jtype;\r
217                 max_input_ts[0] = NULL; max_input_ts[1] = NULL;\r
218                 max_input_tuple[0].data = max_input_tuple_data[0];\r
219                 max_input_tuple[1].data = max_input_tuple_data[1];\r
220 \r
221                 curr_ts =  new timestamp();\r
222                 curr_ts_valid = false;\r
223                 hash_empty = true;\r
224                 soft_queue_size_limit = size_limit;\r
225 \r
226                 sch0=schema_handle0;\r
227                 sch1=schema_handle1;\r
228 n_calls=0; n_iters=0; n_eqk=0;\r
229 \r
230                 queue_mem = 0;\r
231         }\r
232 \r
233         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
234                 bool do_join_update = false;\r
235                 int i;\r
236                 bool failed;\r
237 \r
238 //                      Dummy tuple for outer join processing.\r
239                 host_tuple empty_tuple;\r
240                 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;\r
241 \r
242 \r
243                 if (tup.channel > 1) {\r
244                         gslog(LOG_ALERT, "Illegal channel number %d for two-way join, handles=%d, %d\n", tup.channel, sch0, sch1);\r
245                         return 0;\r
246                 }\r
247 \r
248                 bool is_temp_tuple = func.temp_status_received(tup);\r
249 \r
250 //                      Ensure that the queue ts is initialized.\r
251                 if(max_input_ts[tup.channel] == NULL){\r
252                         max_input_ts[tup.channel] = new timestamp();\r
253                         if(! func.load_ts_from_tup(max_input_ts[tup.channel],tup)){\r
254                                 tup.free_tuple();\r
255                                 delete max_input_ts[tup.channel];\r
256                                 max_input_ts[tup.channel] = NULL;\r
257                                 return(0);      // can't load ts -- bail out.\r
258                         }\r
259 \r
260                         if( max_input_ts[1-tup.channel]){\r
261                                 int qcmp = compare_qts();\r
262                                 if(qcmp<=0){\r
263                                         func.load_ts_from_ts(curr_ts, max_input_ts[0]);\r
264                                 }else{\r
265                                         func.load_ts_from_ts(curr_ts, max_input_ts[1]);\r
266                                 }\r
267                                 curr_ts_valid = true;\r
268                         }\r
269                 }\r
270 \r
271 // reject "out of order" tuple - silently.\r
272                 timestamp tup_ts;\r
273                 if(! func.load_ts_from_tup(&tup_ts,tup)){\r
274                         tup.free_tuple();\r
275                         return(0);      // can't load ts -- bail out.\r
276                 }\r
277 \r
278             int tup_order=func.compare_ts_with_ts(&tup_ts,max_input_ts[tup.channel]);\r
279                 if (tup_order < 0){\r
280 printf("out of order ts.\n");\r
281                         tup.free_tuple();\r
282 \r
283                         // even for out of order temporal tuples we need to post new temporal tuple\r
284                         if (is_temp_tuple) {\r
285                                 host_tuple temp_tup;\r
286                                 temp_tup.channel = output_channel;\r
287                                 if (!get_temp_status(temp_tup))\r
288                                         result.push_back(temp_tup);\r
289                         }\r
290                         return  0;\r
291                 }\r
292 \r
293 //      Update max if larger\r
294                 if(tup_order > 0){\r
295                         func.load_ts_from_ts(max_input_ts[tup.channel],&tup_ts);\r
296 \r
297                         // save the content of the max tuple\r
298                         max_input_tuple[tup.channel].channel = tup.channel;\r
299                         max_input_tuple[tup.channel].tuple_size = tup.tuple_size;\r
300                         memcpy(max_input_tuple[tup.channel].data, tup.data, tup.tuple_size);\r
301 \r
302 //                      do_join_update = true;\r
303                 }\r
304 \r
305 //              Add to input queue if it passes the prefilter.\r
306                 if(!is_temp_tuple && func.apply_prefilter(tup)){\r
307                         if(append_tuple(tup,tup.channel) == 1){\r
308                                 do_join_update = true;  // added tuple to empty queue\r
309                         }\r
310                 }else{\r
311                         tup.free_tuple();\r
312                 }\r
313 \r
314 //              If status changed, apply tuples to join.\r
315 //              (updated max time, added tuple to empty queue)\r
316 \r
317 //              clear queues, advance curr_ts\r
318                         if(compare_qts_to_hashts(0)>0 && compare_qts_to_hashts(1)>0){\r
319                                 process_outer_join(result);\r
320 \r
321 \r
322                           int minq = 0;\r
323                           if(compare_qts() > 0)\r
324                                 minq = 1;\r
325                           if(input_queue[minq].empty())\r
326                                 func.load_ts_from_ts(curr_ts,max_input_ts[minq]);\r
327                           else\r
328                                 func.load_ts_from_tup(curr_ts,input_queue[minq].front());\r
329                         }\r
330 \r
331 //                              Process any tuples to be joined.\r
332                                         process_join(result);\r
333 \r
334 \r
335                 // post new temporal tuple\r
336 \r
337                 if(is_temp_tuple) {\r
338                         host_tuple temp_tup;\r
339                         temp_tup.channel = output_channel;\r
340                         if (!get_temp_status(temp_tup))\r
341                                 result.push_back(temp_tup);\r
342                 }\r
343 \r
344                 return 0;\r
345         }\r
346 \r
347         int flush(list<host_tuple>& result) {\r
348 \r
349                 process_outer_join(result);\r
350 \r
351                 int minq = 0;\r
352                 if(compare_qts() > 0)\r
353                         minq = 1;\r
354 \r
355                 if(input_queue[minq].empty())\r
356                         func.load_ts_from_ts(curr_ts,max_input_ts[minq]);\r
357                 else\r
358                         func.load_ts_from_tup(curr_ts,input_queue[minq].front());\r
359 \r
360                 process_join(result);\r
361 \r
362                 return 0;\r
363         }\r
364 \r
365         int set_param_block(int sz, void * value) {\r
366                 func.set_param_block(sz, value);\r
367                 return 0;\r
368         }\r
369 \r
370 \r
371         int get_temp_status(host_tuple& result) {\r
372 //                      temp tuple timestamp should be minimum between\r
373 //                      minimums of all input queues\r
374 \r
375                 // find the inputstream in minimum lowebound of the timestamp\r
376                 int qcmp = compare_qts();\r
377                 int minq = 0; if(qcmp>0) minq = 1;\r
378 \r
379                 host_tuple empty_tuple;\r
380                 empty_tuple.tuple_size = 0; empty_tuple.data = NULL;\r
381                 host_tuple& left_tuple = empty_tuple;\r
382                 host_tuple& right_tuple = empty_tuple;\r
383 \r
384                 if (minq == 0) {\r
385                         if(max_input_ts[minq]) {\r
386                                 if (input_queue[minq].empty())\r
387                                         left_tuple = max_input_tuple[minq];\r
388                                 else\r
389                                         left_tuple = input_queue[minq].front();\r
390                         }\r
391                 } else {\r
392                         if(max_input_ts[minq]) {\r
393                                 if (input_queue[minq].empty())\r
394                                         right_tuple = max_input_tuple[minq];\r
395                                 else\r
396                                         right_tuple = input_queue[minq].front();\r
397                         }\r
398                 }\r
399 \r
400                 result.channel = output_channel;\r
401                 return func.create_temp_status_tuple(left_tuple, right_tuple, result);\r
402         }\r
403 \r
404 \r
405         int get_blocked_status () {\r
406                 if(input_queue[0].size() > soft_queue_size_limit) return(0);\r
407                 if(input_queue[1].size() > soft_queue_size_limit) return(1);\r
408                 return -1;\r
409         }\r
410 \r
411         unsigned int get_mem_footprint() {\r
412                 return join_tbl[0].get_mem_footprint() + join_tbl[1].get_mem_footprint() + queue_mem;\r
413         }\r
414 };\r
415 \r
416 #endif  // JOIN_EQ_HASH_OPERATOR_H\r
417 \r