Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / merge_operator.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef MERGE_OPERATOR_H\r
17 #define MERGE_OPERATOR_H\r
18 \r
19 #include "host_tuple.h"\r
20 #include "base_operator.h"\r
21 #include <list>\r
22 using namespace std;\r
23 \r
24 #include <stdio.h>\r
25 \r
26 \r
27 //int last_tb = 0;\r
28 \r
29 template <class merge_functor>\r
30 class merge_operator : public base_operator {\r
31 private :\r
32         // input channel on which we want to receive the next tuple\r
33         // value -1 indicates that we never received a single tuple\r
34         // and we are not concerned on which channel tuple will arrive\r
35         int wait_channel;\r
36 \r
37         // list of tuples from one of the channel waiting to be compared\r
38         // against tuple from the other channel\r
39         list<host_tuple> merge_queue;\r
40         int merge_qsize;\r
41 \r
42         // comparator object used to compare the tuples\r
43         merge_functor func;\r
44 \r
45         // soft limit on queue size - we consider operator blocked on its input\r
46         // whenever we reach this soft limit (not used anymore)\r
47         size_t soft_queue_size_limit;\r
48 \r
49         int dropped_cnt;\r
50 \r
51         // memory footprint for the merge queue in bytes\r
52         unsigned int queue_mem;\r
53 \r
54         // appends tuple to the end of the merge_queue\r
55         // if tuple is stack resident, makes it heap resident\r
56         void append_tuple(host_tuple& tup) {\r
57                 if (!tup.heap_resident) {\r
58                         char* data = (char*)malloc(tup.tuple_size);\r
59                         memcpy(data, tup.data, tup.tuple_size);\r
60                         tup.data = data;\r
61                         tup.heap_resident = true;\r
62                 }\r
63                 merge_queue.push_back(tup);\r
64                 merge_qsize++;\r
65                 queue_mem += tup.tuple_size;\r
66         }\r
67 \r
68         void purge_queue(int channel, list<host_tuple>& result){\r
69                 if (merge_queue.empty())\r
70                         return;\r
71                 host_tuple top_tuple = merge_queue.front();\r
72                 // remove all the tuple smaller than arrived tuple\r
73                 while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {\r
74                         merge_queue.pop_front();\r
75                         merge_qsize--;\r
76                         queue_mem -= top_tuple.tuple_size;\r
77                         if(merge_qsize<0) abort();\r
78                         func.update_stored_temp_status(top_tuple,channel);\r
79                         func.xform_tuple(top_tuple);\r
80                         top_tuple.channel = output_channel;\r
81                         result.push_back(top_tuple);\r
82 \r
83                         if (merge_queue.empty())\r
84                                 break;\r
85 \r
86                         top_tuple = merge_queue.front();\r
87                         func.update_stored_temp_status(top_tuple,channel);\r
88                 }\r
89         }\r
90 \r
91 \r
92 \r
93 public:\r
94 \r
95         merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {\r
96                 wait_channel = -1;\r
97                 soft_queue_size_limit = size_limit;\r
98                 merge_qsize = 0;\r
99                 dropped_cnt = 0;\r
100                 queue_mem = 0;\r
101         }\r
102 \r
103 \r
104         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
105                 int last_channel;\r
106 \r
107                 if (tup.channel > 1) {\r
108                         fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);\r
109                         return 0;\r
110                 }\r
111 \r
112                 /* reject "out of order" tuple - we can receive those after forced flush */\r
113                 func.get_timestamp(tup);\r
114                 int res = func.compare_with_temp_status(tup.channel);\r
115                 bool is_temp_tuple = func.temp_status_received(tup);\r
116 \r
117 /*\r
118 //                      Ignore temp tuples until we can fix their timestamps.\r
119 if(is_temp_tuple){\r
120  tup.free_tuple();\r
121  return 0;\r
122 }*/\r
123                 if (res < 0){   // out of order tuple\r
124 \r
125                         if (++dropped_cnt % 100000 == 0) {\r
126                                 gslog(LOG_ALERT, "%d tuples dropped by %s  merge\n", dropped_cnt,get_name());\r
127                         }\r
128                         //if(func.print_warnings())\r
129                         //      fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);\r
130                         // free tuple memory\r
131                         tup.free_tuple();\r
132                         return 0;\r
133                 } else {\r
134 \r
135                         if (wait_channel < 0 || wait_channel != tup.channel) {\r
136                                 if (wait_channel < 0)\r
137                                         func.update_temp_status(tup);\r
138 \r
139                                 if (!is_temp_tuple){\r
140                                         if(func.compare_with_temp_status(1-tup.channel) <= 0){\r
141                                                 if (!tup.heap_resident) {\r
142                                                         char* data = (char*)malloc(tup.tuple_size);\r
143                                                         memcpy(data, tup.data, tup.tuple_size);\r
144                                                         tup.data = data;\r
145                                                         tup.heap_resident = true;\r
146                                                 }\r
147                                                 func.xform_tuple(tup);\r
148                                                 tup.channel = output_channel;\r
149                                                 result.push_back(tup);\r
150                                         }else{\r
151                                                 append_tuple(tup);              // put arrived tuple in the queue\r
152                                         }\r
153                                 }\r
154 //                              func.update_temp_status_by_slack(tup, 1-tup.channel);\r
155 //                              purge_queue(tup.channel, result);\r
156 \r
157                                 wait_channel = 1-tup.channel;\r
158                         }else{\r
159         //                              If possible, clear tuples from the other queue.\r
160                                 func.update_temp_status(tup);\r
161                                 purge_queue(1-tup.channel, result);\r
162 \r
163                                 if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple\r
164                                         if (!is_temp_tuple) {\r
165                                                 if (!tup.heap_resident) {\r
166                                                         char* data = (char*)malloc(tup.tuple_size);\r
167                                                         memcpy(data, tup.data, tup.tuple_size);\r
168                                                         tup.data = data;\r
169                                                         tup.heap_resident = true;\r
170                                                 }\r
171                                                 func.xform_tuple(tup);\r
172                                                 tup.channel = output_channel;\r
173                                                 result.push_back(tup);\r
174                                         }\r
175                                 }\r
176                                 else {\r
177                                         if (!is_temp_tuple)\r
178                                                 append_tuple(tup);              // put arrived tuple in the queue\r
179                                         wait_channel = 1 - tup.channel; // now we want the tuple from other channel\r
180                                 }\r
181                         }\r
182 \r
183                 }\r
184 \r
185                 // temp status tuples emited by merge don't serve any other purpose\r
186                 // other than tracing the flow of tuples\r
187                 if (is_temp_tuple) {\r
188                         host_tuple temp_tup;\r
189                         if (!func.create_temp_status_tuple(temp_tup)) {\r
190                                 temp_tup.channel = output_channel;\r
191                                 result.push_back(temp_tup);\r
192                         }\r
193                         // clear memory of heap-resident temporal tuples\r
194                         tup.free_tuple();\r
195                 }\r
196 \r
197                 if (!merge_qsize)\r
198                         wait_channel = -1;\r
199 \r
200                 return 0;\r
201         }\r
202 \r
203         int flush(list<host_tuple>& result) {\r
204 \r
205                 if (merge_queue.empty())\r
206                         return 0;\r
207 \r
208                 host_tuple top_tuple;\r
209                 list<host_tuple>::iterator iter;\r
210                 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {\r
211                         top_tuple = *iter;\r
212                         func.update_stored_temp_status(top_tuple,top_tuple.channel);\r
213                         func.xform_tuple(top_tuple);\r
214                         top_tuple.channel = output_channel;\r
215                         result.push_back(top_tuple);\r
216                 }\r
217 \r
218                 queue_mem = 0;\r
219 \r
220                 return 0;\r
221         }\r
222 \r
223         int set_param_block(int sz, void * value) {\r
224                         return 0;\r
225         }\r
226 \r
227 \r
228         int get_temp_status(host_tuple& result) {\r
229                 result.channel = output_channel;\r
230                 return func.create_temp_status_tuple(result);\r
231         }\r
232 \r
233 \r
234         int get_blocked_status () {\r
235                 if (merge_qsize> soft_queue_size_limit)\r
236                         return wait_channel;\r
237                 else\r
238                         return -1;\r
239         }\r
240 \r
241         unsigned int get_mem_footprint() {\r
242                 return queue_mem;\r
243         }\r
244 \r
245 };\r
246 \r
247 #endif  // MERGE_OPERATOR_H\r