Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / include / hfta / merge_operator_oop.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef MERGE_OPERATOR_OOP_H
17 #define MERGE_OPERATOR_OOP_H
18
19 #include "host_tuple.h"
20 #include "base_operator.h"
21 #include <list>
22 using namespace std;
23
24 #include <stdio.h>
25
26
27 //int last_tb = 0;
28
29 template <class merge_functor>
30 class merge_operator_oop : public base_operator {
31 private :
32         // input channel on which we want to receive the next tuple
33         // value -1 indicates that we never received a single tuple
34         // and we are not concerned on which channel tuple will arrive
35         int wait_channel;
36
37         // list of tuples from one of the channel waiting to be compared
38         // against tuple from the other channel
39         list<host_tuple> merge_queue;
40         int merge_qsize;
41
42         // comparator object used to compare the tuples
43         merge_functor func;
44
45         // soft limit on queue size - we consider operator blocked on its input
46         // whenever we reach this soft limit (not used anymore)
47         size_t soft_queue_size_limit;
48
49         int dropped_cnt;
50
51         // memory footprint for the merge queue in bytes
52         unsigned int queue_mem;
53
54         // appends tuple to the end of the merge_queue
55 // tuple is stack resident, makes it heap resident
56         void append_tuple(host_tuple& tup) {
57                 if (!tup.heap_resident) {
58                         char* data = (char*)malloc(tup.tuple_size);
59                         memcpy(data, tup.data, tup.tuple_size);
60                         tup.data = data;
61                         tup.heap_resident = true;
62                 }
63                 merge_queue.push_back(tup);
64                 merge_qsize++;
65                 queue_mem += tup.tuple_size;
66         }
67
68
69         // purge tuples in the queue, which are from stream channel -- Jin
70         void purge_queue(int channel, list<host_tuple>& result){
71                 if (merge_queue.empty())
72                         return;
73                 host_tuple top_tuple = merge_queue.front();
74                 // remove all the tuple smaller than arrived tuple
75                 while(func.compare_with_temp_status(top_tuple, 1-channel) <= 0) {
76                         merge_queue.pop_front();
77                         merge_qsize--;
78                         queue_mem -= top_tuple.tuple_size;
79                         if(merge_qsize<0) abort();
80                         func.update_temp_status(top_tuple,channel);
81                         func.xform_tuple(top_tuple);
82                         top_tuple.channel = output_channel;
83                         result.push_back(top_tuple);
84
85                         if (merge_queue.empty())
86                                 break;
87
88                         top_tuple = merge_queue.front();
89                         func.update_temp_status(top_tuple,channel);
90                 }
91         }
92
93
94
95 public:
96
97         merge_operator_oop(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
98                 wait_channel = -1;
99                 soft_queue_size_limit = size_limit;
100                 merge_qsize = 0;
101                 dropped_cnt = 0;
102                 queue_mem = 0;
103         }
104
105         int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
106                 int last_channel;
107
108                 if (tup.channel > 1) {
109                         fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
110                         return 0;
111                 }
112
113                 func.get_timestamp(tup);
114                 func.compare_with_temp_status(tup.channel);
115                 bool is_temp_tuple = func.temp_status_received(tup);
116
117                 if (!is_temp_tuple) {
118                         result.push_back(tup);
119                 } else {
120                         func.update_temp_status(tup);
121
122                         host_tuple temp_tup;
123                         if (!func.create_temp_status_tuple(temp_tup)) {
124                                 temp_tup.channel = output_channel;
125                                 result.push_back(temp_tup);
126                         }
127                         // clear memory of heap-resident temporal tuples
128                         tup.free_tuple();
129                 }
130
131                 return 0;
132         }
133
134
135
136         /* where is this called?  when using the blocking mode -- Jin */
137         int flush(list<host_tuple>& result) {
138
139                 if (merge_queue.empty())
140                         return 0;
141
142                 host_tuple top_tuple;
143                 list<host_tuple>::iterator iter;
144                 for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
145                         top_tuple = *iter;
146                         func.update_stored_temp_status(top_tuple,top_tuple.channel);
147                         func.xform_tuple(top_tuple);
148                         top_tuple.channel = output_channel;
149                         result.push_back(top_tuple);
150                 }
151
152                 queue_mem = 0;
153
154                 return 0;
155         }
156
157         int set_param_block(int sz, void * value) {
158                         return 0;
159         }
160
161
162         int get_temp_status(host_tuple& result) {
163                 result.channel = output_channel;
164                 return func.create_temp_status_tuple(result);
165         }
166
167
168         int get_blocked_status () {
169                 if (merge_qsize> soft_queue_size_limit)
170                         return wait_channel;
171                 else
172                         return -1;
173         }
174
175         unsigned int get_mem_footprint() {
176                 return queue_mem;
177         }
178
179 };
180
181 #endif  // MERGE_OPERATOR_OOP_H
182