Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / merge_operator_oop.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef MERGE_OPERATOR_OOP_H\r
17 #define MERGE_OPERATOR_OOP_H\r
18 \r
19 #include "host_tuple.h"\r
20 #include "base_operator.h"\r
21 #include <list>\r
22 using namespace std;\r
23 \r
24 #include <stdio.h>\r
25 \r
26 \r
27 //int last_tb = 0;\r
28 \r
29 template <class merge_functor>\r
30 class merge_operator_oop : public base_operator {\r
31 private :\r
32         // input channel on which we want to receive the next tuple\r
33         // value -1 indicates that we never received a single tuple\r
34         // and we are not concerned on which channel tuple will arrive\r
35         int wait_channel;\r
36 \r
37         // list of tuples from one of the channel waiting to be compared\r
38         // against tuple from the other channel\r
39         list<host_tuple> merge_queue;\r
40         int merge_qsize;\r
41 \r
42         // comparator object used to compare the tuples\r
43         merge_functor func;\r
44 \r
45         // soft limit on queue size - we consider operator blocked on its input\r
46         // whenever we reach this soft limit (not used anymore)\r
47         size_t soft_queue_size_limit;\r
48 \r
49         int dropped_cnt;\r
50 \r
51         // memory footprint for the merge queue in bytes\r
52         unsigned int queue_mem;\r
53 \r
54         // appends tuple to the end of the merge_queue\r
55 // tuple is stack resident, makes it heap resident\r
56         void append_tuple(host_tuple& tup) {\r
57                 if (!tup.heap_resident) {\r
58                         char* data = (char*)malloc(tup.tuple_size);\r
59                         memcpy(data, tup.data, tup.tuple_size);\r
60                         tup.data = data;\r
61                         tup.heap_resident = true;\r
62                 }\r
63                 merge_queue.push_back(tup);\r
64                 merge_qsize++;\r
65                 queue_mem += tup.tuple_size;\r
66         }\r
67 \r
68 \r
69         // purge tuples in the queue, which are from stream channel -- Jin\r
70         void purge_queue(int channel, list<host_tuple>& result){\r
71                 if (merge_queue.empty())\r
72                         return;\r
73                 host_tuple top_tuple = merge_queue.front();\r
74                 // remove all the tuple smaller than arrived tuple\r
75                 while(func.compare_with_temp_status(top_tuple, 1-channel) <= 0) {\r
76                         merge_queue.pop_front();\r
77                         merge_qsize--;\r
78                         queue_mem -= top_tuple.tuple_size;\r
79                         if(merge_qsize<0) abort();\r
80                         func.update_temp_status(top_tuple,channel);\r
81                         func.xform_tuple(top_tuple);\r
82                         top_tuple.channel = output_channel;\r
83                         result.push_back(top_tuple);\r
84 \r
85                         if (merge_queue.empty())\r
86                                 break;\r
87 \r
88                         top_tuple = merge_queue.front();\r
89                         func.update_temp_status(top_tuple,channel);\r
90                 }\r
91         }\r
92 \r
93 \r
94 \r
95 public:\r
96 \r
97         merge_operator_oop(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {\r
98                 wait_channel = -1;\r
99                 soft_queue_size_limit = size_limit;\r
100                 merge_qsize = 0;\r
101                 dropped_cnt = 0;\r
102                 queue_mem = 0;\r
103         }\r
104 \r
105         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
106                 int last_channel;\r
107 \r
108                 if (tup.channel > 1) {\r
109                         fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);\r
110                         return 0;\r
111                 }\r
112 \r
113                 func.get_timestamp(tup);\r
114                 func.compare_with_temp_status(tup.channel);\r
115                 bool is_temp_tuple = func.temp_status_received(tup);\r
116 \r
117                 if (!is_temp_tuple) {\r
118                         result.push_back(tup);\r
119                 } else {\r
120                         func.update_temp_status(tup);\r
121 \r
122                         host_tuple temp_tup;\r
123                         if (!func.create_temp_status_tuple(temp_tup)) {\r
124                                 temp_tup.channel = output_channel;\r
125                                 result.push_back(temp_tup);\r
126                         }\r
127                         // clear memory of heap-resident temporal tuples\r
128                         tup.free_tuple();\r
129                 }\r
130 \r
131                 return 0;\r
132         }\r
133 \r
134 \r
135 \r
136         /* where is this called?  when using the blocking mode -- Jin */\r
137         int flush(list<host_tuple>& result) {\r
138 \r
139                 if (merge_queue.empty())\r
140                         return 0;\r
141 \r
142                 host_tuple top_tuple;\r
143                 list<host_tuple>::iterator iter;\r
144                 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {\r
145                         top_tuple = *iter;\r
146                         func.update_stored_temp_status(top_tuple,top_tuple.channel);\r
147                         func.xform_tuple(top_tuple);\r
148                         top_tuple.channel = output_channel;\r
149                         result.push_back(top_tuple);\r
150                 }\r
151 \r
152                 queue_mem = 0;\r
153 \r
154                 return 0;\r
155         }\r
156 \r
157         int set_param_block(int sz, void * value) {\r
158                         return 0;\r
159         }\r
160 \r
161 \r
162         int get_temp_status(host_tuple& result) {\r
163                 result.channel = output_channel;\r
164                 return func.create_temp_status_tuple(result);\r
165         }\r
166 \r
167 \r
168         int get_blocked_status () {\r
169                 if (merge_qsize> soft_queue_size_limit)\r
170                         return wait_channel;\r
171                 else\r
172                         return -1;\r
173         }\r
174 \r
175         unsigned int get_mem_footprint() {\r
176                 return queue_mem;\r
177         }\r
178 \r
179 };\r
180 \r
181 #endif  // MERGE_OPERATOR_OOP_H\r
182 \r