Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / hfta.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef __HFTA_H\r
17 #define __HFTA_H\r
18 \r
19 #include "gstypes.h"\r
20 #include "host_tuple.h"\r
21 #include "base_operator.h"\r
22 #include <vector>\r
23 #include <map>\r
24 #include<math.h>\r
25 #include "rdtsc.h"\r
26 using namespace std;\r
27 \r
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))\r
29 #define LLMIN(x,y) ((x)<(y)?(x):(y))\r
30 #define LLMAX(x,y) ((x)<(y)?(y):(x))\r
31 #define UMIN(x,y) ((x)<(y)?(x):(y))\r
32 #define UMAX(x,y) ((x)<(y)?(y):(x))\r
33 #define EQ(x,y) ((x)==(y))\r
34 #define GEQ(x,y) ((x)>=(y))\r
35 #define LEQ(x,y) ((x)<=(y))\r
36 //      Cast away temporality\r
37 #define non_temporal(x)(x)\r
38 //      Access math libraries\r
39 #define sqrt(x) sqrt(x)\r
40 \r
41 \r
42 extern "C" {\r
43 #include <lapp.h>\r
44 #include <fta.h>\r
45 #include <stdlib.h>\r
46 #include <stdio.h>\r
47 #include <schemaparser.h>\r
48 }\r
49 \r
50 struct param_block {\r
51         gs_int32_t block_length;\r
52         void* data;\r
53 };\r
54 \r
55 // forward declaration of operator_node\r
56 struct operator_node;\r
57 \r
58 struct lfta_info {\r
59         gs_schemahandle_t schema_handle;\r
60         gs_sp_t schema;\r
61         gs_sp_t fta_name;\r
62 #ifdef PLAN_DAG\r
63         list<operator_node*> parent_list;\r
64         list<unsigned> out_channel_list;\r
65 #else\r
66         operator_node* parent;\r
67         unsigned output_channel;\r
68 #endif\r
69         FTAID f;\r
70 \r
71         lfta_info() {\r
72                 schema_handle = -1;\r
73                 schema = NULL;\r
74                 #ifndef PLAN_DAG\r
75                         parent = NULL;\r
76                         output_channel = 0;\r
77                 #endif\r
78         }\r
79 \r
80         ~lfta_info() {\r
81                 if (fta_name)\r
82                         free (fta_name);\r
83                 if (schema)\r
84                         free (schema);\r
85                 if (schema_handle >= 0)\r
86                         ftaschema_free(schema_handle);\r
87         }\r
88 };\r
89 \r
90 \r
91 struct operator_node {\r
92         base_operator* op;\r
93 \r
94 #ifdef PLAN_DAG\r
95         list<operator_node*> parent_list;\r
96         list<unsigned> out_channel_list;\r
97 #else\r
98         operator_node* parent;\r
99 #endif\r
100 \r
101         operator_node* left_child;\r
102         operator_node* right_child;\r
103         lfta_info* left_lfta;\r
104         lfta_info* right_lfta;\r
105 \r
106         list<host_tuple> input_queue;\r
107 \r
108         operator_node(base_operator* o) {\r
109                 op = o;\r
110                 #ifndef PLAN_DAG\r
111                         parent = NULL;\r
112                 #endif\r
113                 left_child = right_child = NULL;\r
114                 left_lfta = right_lfta = NULL;\r
115         }\r
116 \r
117         ~operator_node() {\r
118                 delete op;\r
119         }\r
120 \r
121         void set_left_child_node(operator_node* child) {\r
122                 left_child = child;\r
123                 if (child) {\r
124                         #ifdef PLAN_DAG\r
125                                 child->parent_list.push_back(this);\r
126                                 child->out_channel_list.push_back(0);\r
127                         #else\r
128                                 child->parent = this;\r
129                                 child->op->set_output_channel(0);\r
130                         #endif\r
131                 }\r
132         }\r
133 \r
134         void set_right_child_node(operator_node* child) {\r
135                 right_child = child;\r
136                 if (child) {\r
137                         #ifdef PLAN_DAG\r
138                                 child->parent_list.push_back(this);\r
139                                 child->out_channel_list.push_back(1);\r
140                         #else\r
141                                 child->parent = this;\r
142                                 child->op->set_output_channel(1);\r
143                         #endif\r
144                 }\r
145         }\r
146 \r
147         void set_left_lfta(lfta_info* l_lfta) {\r
148                 left_lfta = l_lfta;\r
149                 if (left_lfta) {\r
150                         #ifdef PLAN_DAG\r
151                                 left_lfta->parent_list.push_back(this);\r
152                                 left_lfta->out_channel_list.push_back(0);\r
153                         #else\r
154                                 left_lfta->parent = this;\r
155                                 left_lfta->output_channel = 0;\r
156                         #endif\r
157                 }\r
158         }\r
159 \r
160         void set_right_lfta(lfta_info* r_lfta) {\r
161                 right_lfta = r_lfta;\r
162                 if (right_lfta) {\r
163                         #ifdef PLAN_DAG\r
164                                 right_lfta->parent_list.push_back(this);\r
165                                 right_lfta->out_channel_list.push_back(1);\r
166                         #else\r
167                                 right_lfta->parent = this;\r
168                                 right_lfta->output_channel = 1;\r
169                         #endif\r
170                 }\r
171         }\r
172 \r
173 };\r
174 \r
175 \r
176 \r
177 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);\r
178 void finalize_tuple(host_tuple &tup);\r
179 void finalize_tuple(host_tuple &tup);\r
180 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
181 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
182 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
183 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);\r
184 \r
185 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
186 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
187 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
188 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);\r
189 \r
190 struct UNOP_HFTA {\r
191         struct FTA _fta;\r
192         base_operator* oper;\r
193         FTAID f;\r
194         bool failed;\r
195         gs_schemahandle_t schema_handle;\r
196 \r
197         list<host_tuple> output_queue;\r
198 \r
199         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
200         // lfta_name and an instance of the base_operator\r
201         // We don't need to know the output schema as this information is already embeded\r
202         // in create_output_tuple method of operators' functor.\r
203         UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,\r
204                 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {\r
205 \r
206                 failed = false;\r
207                 oper = op;\r
208                 f = child_ftaid;\r
209                 schema_handle = sch_handle;\r
210 \r
211                 // assign streamid\r
212                 _fta.ftaid = ftaid;\r
213                 _fta.ftaid.streamid = (gs_p_t)this;\r
214 \r
215 #ifdef DEBUG\r
216                 fprintf(stderr,"Instantiate a FTA\n");\r
217 #endif\r
218                 /* extract lfta param block from hfta param block */\r
219                 list<param_block> param_list;\r
220                 get_lfta_params(sz, value, param_list);\r
221                 param_block param = param_list.front();\r
222 \r
223 \r
224         gs_uint32_t reuse_flag = 2;\r
225         // we will try to create a new instance of child FTA only if it is parameterized\r
226         if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
227                 f.streamid = 0; // not interested in existing instances\r
228                 reuse_flag = 0;\r
229                 }\r
230 \r
231                 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {\r
232                 fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
233                         failed = true;\r
234                     return;\r
235             }\r
236 \r
237                 free(param.data);\r
238                 // set the operator's parameters\r
239                 if(oper->set_param_block(sz, (void*)value)) failed = true;;\r
240 \r
241 \r
242                 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);\r
243                 _fta.stream_subscribed_cnt = 1;\r
244                 _fta.stream_subscribed[0] = f;\r
245 \r
246                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)\r
247                 _fta.free_fta = UNOP_HFTA_free_fta;\r
248                 _fta.control_fta = UNOP_HFTA_control_fta;\r
249                 _fta.accept_packet = UNOP_HFTA_accept_packet;\r
250                 _fta.clock_fta = UNOP_HFTA_clock_fta;\r
251         }\r
252 \r
253         ~UNOP_HFTA() {\r
254          delete oper;    // free operators memory\r
255 \r
256      }\r
257 \r
258      int flush() {\r
259                 list<host_tuple> res;\r
260                 if (!oper->flush(res)) {\r
261 \r
262                         if (!res.empty()) {\r
263                                 // go through the list of returned tuples and finalyze them\r
264                                 list<host_tuple>::iterator iter = res.begin();\r
265                                 while (iter != res.end()) {\r
266                                         host_tuple& tup = *iter;\r
267 \r
268                                         // finalize the tuple\r
269                                         if (tup.tuple_size)\r
270                                                 finalize_tuple(tup);\r
271                                         iter++;\r
272                                 }\r
273 \r
274                                 // append returned list to output_queue\r
275                                 output_queue.splice(output_queue.end(), res);\r
276 \r
277 \r
278                                 // post tuples\r
279                                 while (!output_queue.empty()) {\r
280                                         host_tuple& tup = output_queue.front();\r
281                                         #ifdef DEBUG\r
282                                                 fprintf(stderr, "HFTA::about to post tuple\n");\r
283                                         #endif\r
284                                         if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
285                                                 tup.free_tuple();\r
286                                                 output_queue.pop_front();\r
287                                         } else\r
288                                                 break;\r
289                                 }\r
290                         }\r
291                 }\r
292 \r
293                 return 0;\r
294          }\r
295 \r
296         bool init_failed(){return failed;}\r
297 };\r
298 \r
299 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
300         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor\r
301                                                                 // will be called on program exit\r
302 \r
303         if (recursive)\r
304                 // free instance we are subscribed to\r
305                 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);\r
306 \r
307         delete ftap;\r
308         return 0;\r
309 }\r
310 \r
311 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
312         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
313 \r
314         if (command == FTA_COMMAND_FLUSH) {\r
315                 // ask lfta to do the flush\r
316                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
317                 ftap->flush();\r
318 \r
319         } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
320                 // ask lfta to do the flush\r
321                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
322                 ftap->flush();\r
323 \r
324                 /* extract lfta param block from hfta param block */\r
325                 list<param_block> param_list;\r
326                 get_lfta_params(sz, value, param_list);\r
327                 param_block param = param_list.front();\r
328                 // load new parameters into lfta\r
329                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);\r
330                 free(param.data);\r
331 \r
332                 // notify the operator about the change of parameter\r
333                 ftap->oper->set_param_block(sz, (void*)value);\r
334 \r
335         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
336                 // we no longer use temp_status commands\r
337                 // hearbeat mechanism is used instead\r
338         }\r
339         return 0;\r
340 }\r
341 \r
342 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
343         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
344 #ifdef DEBUG\r
345         fprintf(stderr, "HFTA::accepted packet\n");\r
346 #endif\r
347         if (!length)     /* ignore null tuples */\r
348                 return 0;\r
349 \r
350         host_tuple temp;\r
351         temp.tuple_size = length;\r
352         temp.data = packet;\r
353         temp.channel = 0;\r
354         temp.heap_resident = false;\r
355 \r
356         // pass the tuple to operator\r
357         list<host_tuple> res;\r
358         int ret;\r
359         fta_stat* tup_trace = NULL;\r
360         gs_uint32_t tup_trace_sz = 0;\r
361         gs_uint64_t trace_id = 0;\r
362         bool temp_tuple_received = false;\r
363 \r
364 \r
365         // if the tuple is temporal we need to extract the hearbeat payload\r
366         if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {\r
367                 temp_tuple_received = true;\r
368                 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
369                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
370         }\r
371 \r
372         if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {\r
373                 /* perform a flush  */\r
374                 ftap->flush();\r
375 \r
376                 /* post eof_tuple to a parent */\r
377                 host_tuple eof_tuple;\r
378                 ftap->oper->get_temp_status(eof_tuple);\r
379 \r
380                 /* last byte of the tuple specifies the tuple type\r
381                  * set it to EOF_TUPLE\r
382                 */\r
383                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
384                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
385 \r
386                 return 0;\r
387         }\r
388 \r
389         ret = ftap->oper->accept_tuple(temp, res);\r
390 \r
391         // go through the list of returned tuples and finalyze them\r
392         list<host_tuple>::iterator iter = res.begin();\r
393         while (iter != res.end()) {\r
394                 host_tuple& tup = *iter;\r
395 \r
396                 // finalize the tuple\r
397                 if (tup.tuple_size)\r
398                         finalize_tuple(tup);\r
399                 iter++;\r
400         }\r
401 \r
402         // if we received temporal tuple, last tuple of the result must be temporal too\r
403         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
404         if (temp_tuple_received) {\r
405                 fta_stat stats;\r
406                 host_tuple& temp_tup = res.back();\r
407 \r
408 \r
409                 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
410                 char* new_data = (char*)malloc(new_tuple_size);\r
411                 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
412                 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
413                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
414                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));\r
415 \r
416                 memset((char*)&stats, 0, sizeof(fta_stat));\r
417                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
418 \r
419                 // Send a hearbeat message to clearinghouse.\r
420                 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
421 \r
422                 temp_tup.free_tuple();\r
423                 temp_tup.data = new_data;\r
424                 temp_tup.tuple_size = new_tuple_size;\r
425         }\r
426 \r
427         // append returned list to output_queue\r
428         ftap->output_queue.splice(ftap->output_queue.end(), res);\r
429 \r
430         // post tuples\r
431         while (!ftap->output_queue.empty()) {\r
432                 host_tuple& tup = ftap->output_queue.front();\r
433                 #ifdef DEBUG\r
434                         fprintf(stderr, "HFTA::about to post tuple\n");\r
435                 #endif\r
436                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
437                         tup.free_tuple();\r
438                         ftap->output_queue.pop_front();\r
439                 } else\r
440                         break;\r
441         }\r
442 \r
443         return 1;\r
444 }\r
445 \r
446 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {\r
447 \r
448         // Send a hearbeat message to clearinghouse.to indicate we are alive\r
449         fta_heartbeat(ftap->ftaid, 0, 0, 0);\r
450 \r
451         return 0;\r
452 }\r
453 \r
454 \r
455 struct MULTOP_HFTA {\r
456         struct FTA _fta;\r
457         gs_csp_t fta_name;\r
458         gs_schemahandle_t schema_handle;\r
459         operator_node* root;\r
460         vector<operator_node*> sorted_nodes;\r
461         int num_operators;\r
462         list<lfta_info*> *lfta_list;\r
463         /* number of eof tuples we received so far\r
464          * receiving eof tuples from every source fta will cause a flush\r
465         */\r
466         int num_eof_tuples;\r
467 \r
468         bool failed;\r
469         bool reusable;\r
470 \r
471         list<host_tuple> output_queue;\r
472 \r
473         // Runtime stats\r
474         gs_uint32_t in_tuple_cnt;\r
475         gs_uint32_t out_tuple_cnt;\r
476         gs_uint32_t out_tuple_sz;\r
477         gs_uint64_t cycle_cnt;\r
478 \r
479         gs_uint64_t trace_id;\r
480 \r
481         // memory occupied by output queue\r
482         gs_uint32_t output_queue_mem;\r
483 \r
484 \r
485         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
486         // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,\r
487         // as the schema handle is already passed during operator creation time.\r
488         // We also don't need to know the output schema as this information is already embeded\r
489         // in create_output_tuple method of operators' functor.\r
490 \r
491 \r
492 \r
493         MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,\r
494                 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {\r
495 \r
496                 fta_name = name;\r
497                 failed = false;\r
498 \r
499                 root = node;\r
500                 lfta_list = lftas;\r
501 \r
502                 // assign streamid\r
503                 _fta.ftaid = ftaid;\r
504                 _fta.ftaid.streamid = (gs_p_t)this;\r
505 \r
506                 schema_handle = sch_handle;\r
507 \r
508                 output_queue_mem = 0;\r
509 \r
510                 // topologically sort the operators in the tree (or DAG)\r
511                 // for DAG we make sure we add the node to the sorted list only once\r
512                 operator_node* current_node;\r
513                 map<operator_node*, int> node_map;\r
514                 vector<operator_node*> node_list;\r
515 \r
516                 int i = 0;\r
517                 node_list.push_back(root);\r
518                 node_map[root] = 0;\r
519 \r
520                 num_operators = 1;\r
521 \r
522                 while (i < node_list.size()) {\r
523                         current_node = node_list[i];\r
524                         if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {\r
525                                 node_map[current_node->left_child] = num_operators++;\r
526                                 node_list.push_back(current_node->left_child);\r
527                         }\r
528                         if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {\r
529                                 node_map[current_node->right_child] = num_operators++;\r
530                                 node_list.push_back(current_node->right_child);\r
531                         }\r
532                         i++;\r
533                 }\r
534                 num_operators = i;\r
535 \r
536                 // build adjacency lists for query DAG\r
537                 list<int>* adj_lists = new list<int>[num_operators];\r
538                 bool* leaf_flags = new bool[num_operators];\r
539                 memset(leaf_flags, 0, num_operators * sizeof(bool));\r
540                 for (i = 0; i < num_operators; ++i) {\r
541                         current_node = node_list[i];\r
542                         if (current_node->left_child) {\r
543                                 adj_lists[i].push_back(node_map[current_node->left_child]);\r
544                         }\r
545                         if (current_node->right_child && current_node->left_child != current_node->right_child) {\r
546                                 adj_lists[i].push_back(node_map[current_node->right_child]);\r
547                         }\r
548                 }\r
549 \r
550                 // run topolofical sort\r
551                 bool leaf_found = true;\r
552                 while (leaf_found) {\r
553                         leaf_found = false;\r
554                         // add all leafs to sorted_nodes\r
555                         for (i = 0; i < num_operators; ++i) {\r
556                                 if (!leaf_flags[i] && adj_lists[i].empty()) {\r
557                                         leaf_flags[i] = true;\r
558                                         sorted_nodes.push_back(node_list[i]);\r
559                                         leaf_found = true;\r
560 \r
561                                         // remove the node from its parents adjecency lists\r
562                                         for (int j = 0; j < num_operators; ++j) {\r
563                                                 list<int>::iterator iter;\r
564                                                 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {\r
565                                                         if (*iter == i) {\r
566                                                                 adj_lists[j].erase(iter);\r
567                                                                 break;\r
568                                                         }\r
569                                                 }\r
570                                         }\r
571                                 }\r
572                         }\r
573                 }\r
574 \r
575                 delete[] adj_lists;\r
576                 delete[] leaf_flags;\r
577 \r
578                 // set the parameter block for every operator in tree\r
579                 for (i = 0; i < num_operators; ++i)\r
580                         if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;\r
581 \r
582 #ifdef DEBUG\r
583                 fprintf(stderr,"Instantiate FTAs\n");\r
584 #endif\r
585                 /* extract lfta param block from hfta param block */\r
586 //                      NOTE: param_list must line up with lfta_list\r
587                 list<param_block> param_list;\r
588                 get_lfta_params(sz, value, param_list);\r
589                 list<param_block>::iterator iter1;\r
590                 list<lfta_info*>::iterator iter2 = lfta_list->begin();\r
591 \r
592                 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {\r
593                         lfta_info* inf = *iter2;\r
594 \r
595                 #ifdef DEBUG\r
596                         fprintf(stderr,"Instantiate a FTA\n");\r
597                 #endif\r
598 \r
599                         gs_uint32_t reuse_flag = 2;\r
600 \r
601                         // we will try to create a new instance of child FTA only if it is parameterized\r
602                         if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
603                                 (*iter2)->f.streamid = 0;       // not interested in existing instances\r
604                                 reuse_flag = 0;\r
605                         }\r
606                         if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {\r
607                                 fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
608                                 failed = true;\r
609                                 return;\r
610                         }\r
611 \r
612                         free((*iter1).data);\r
613 \r
614                         //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");\r
615 \r
616                         _fta.stream_subscribed[i]=(*iter2)->f;\r
617                 }\r
618                 _fta.stream_subscribed_cnt = i;\r
619 \r
620                 num_eof_tuples = 0;\r
621 \r
622                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)\r
623                 _fta.free_fta = MULTOP_HFTA_free_fta;\r
624                 _fta.control_fta = MULTOP_HFTA_control_fta;\r
625                 _fta.accept_packet = MULTOP_HFTA_accept_packet;\r
626                 _fta.clock_fta = MULTOP_HFTA_clock_fta;\r
627 \r
628                 // init runtime stats\r
629                 in_tuple_cnt = 0;\r
630                 out_tuple_cnt = 0;\r
631                 out_tuple_sz = 0;\r
632                 cycle_cnt = 0;\r
633 \r
634         }\r
635 \r
636         ~MULTOP_HFTA() {\r
637 \r
638                 list<lfta_info*>::iterator iter;\r
639                 int i = 0;\r
640 \r
641                 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {\r
642                 delete *iter;\r
643                 }\r
644 \r
645                 delete root;    // free operators memory\r
646                 delete lfta_list;\r
647 \r
648 \r
649      }\r
650 \r
651         int flush() {\r
652 \r
653                 list<host_tuple> res;\r
654 \r
655                 // go through the list of operators in topological order\r
656                 // and flush them\r
657                 list<host_tuple>::iterator iter;\r
658                 list<host_tuple> temp_output_queue;\r
659 \r
660                 for (int i = 0; i < num_operators; ++i) {\r
661                         operator_node* node = sorted_nodes[i];\r
662 \r
663 #ifdef PLAN_DAG\r
664                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;\r
665 #else\r
666                         // for trees we can put output tuples directly into parent's input buffer\r
667                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;\r
668 #endif\r
669                         // consume tuples waiting in your queue\r
670                         for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {\r
671                                 node->op->accept_tuple(*(iter), current_output_queue);\r
672                         }\r
673                         node->op->flush(current_output_queue);\r
674                         node->input_queue.clear();\r
675 \r
676 #ifdef PLAN_DAG\r
677                         // copy the tuples from output queue into input queues of all parents\r
678                         list<operator_node*>::iterator node_iter;\r
679 \r
680                         if (!node->parent_list.empty()) {\r
681                                 // append the content of the output queue to parent input queue\r
682 \r
683                                 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {\r
684                                         int* ref_cnt = 0;\r
685                                         if (node->parent_list.size() > 1) {\r
686                                                 ref_cnt = (int*)malloc(sizeof(int));\r
687                                                 *ref_cnt = node->parent_list.size() - 1;\r
688                                         }\r
689 \r
690                                         for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {\r
691                                                 (*iter).ref_cnt = ref_cnt;\r
692                                                 (*node_iter)->input_queue.push_back(*iter);\r
693                                         }\r
694                                 }\r
695                         }\r
696 #endif\r
697                 }\r
698 \r
699                 if (!res.empty()) {\r
700                         // go through the list of returned tuples and finalyze them\r
701                         list<host_tuple>::iterator iter = res.begin();\r
702                         while (iter != res.end()) {\r
703                                 host_tuple& tup = *iter;\r
704 \r
705                                 // finalize the tuple\r
706                                 if (tup.tuple_size)\r
707                                         finalize_tuple(tup);\r
708 \r
709                                 output_queue_mem += tup.tuple_size;\r
710                                 iter++;\r
711                         }\r
712 \r
713                         // append returned list to output_queue\r
714                         output_queue.splice(output_queue.end(), res);\r
715 \r
716 \r
717                         // post tuples\r
718                         while (!output_queue.empty()) {\r
719                                 host_tuple& tup = output_queue.front();\r
720                                 #ifdef DEBUG\r
721                                         fprintf(stderr, "HFTA::about to post tuple\n");\r
722                                 #endif\r
723                                 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
724                                         output_queue_mem -= tup.tuple_size;\r
725                                         tup.free_tuple();\r
726                                         output_queue.pop_front();\r
727                                 } else\r
728                                         break;\r
729                         }\r
730                 }\r
731 \r
732                 return 0;\r
733         }\r
734 \r
735         bool init_failed(){return failed;}\r
736 };\r
737 \r
738 \r
739 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
740         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor\r
741                                                                 // will be called on program exit\r
742 \r
743         if (recursive) {\r
744                 // free instance we are subscribed to\r
745                 list<lfta_info*>::iterator iter;\r
746                 int i = 0;\r
747 \r
748                 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {\r
749                         fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);\r
750                 }\r
751         }\r
752 \r
753         delete ftap;\r
754         return 0;\r
755 }\r
756 \r
757 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
758         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
759 \r
760         if (command == FTA_COMMAND_FLUSH) {\r
761 \r
762                 // ask lftas to do the flush\r
763                 list<lfta_info*>::iterator iter;\r
764                 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {\r
765                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);\r
766                 }\r
767                 // flush hfta operators\r
768                 ftap->flush();\r
769 \r
770         } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
771 \r
772                 list<param_block> param_list;\r
773                 get_lfta_params(sz, value, param_list);\r
774 \r
775                 // ask lftas to do the flush and set new parameters\r
776                 list<lfta_info*>::iterator iter;\r
777                 list<param_block>::iterator iter2;\r
778                 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {\r
779                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);\r
780                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);\r
781                         free((*iter2).data);\r
782                 }\r
783                 // flush hfta operators\r
784                 ftap->flush();\r
785 \r
786                 // set the new parameter block for every operator in tree\r
787                 for (int i = 0; i < ftap->num_operators; ++i)\r
788                         ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);\r
789 \r
790         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
791                 // we no longer use temp_status commands\r
792                 // hearbeat mechanism is used instead\r
793         }\r
794         return 0;\r
795 }\r
796 \r
797 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
798         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
799 \r
800         gs_uint64_t start_cycle = rdtsc();\r
801 #ifdef DEBUG\r
802         fprintf(stderr, "HFTA::accepted packet\n");\r
803 #endif\r
804         if (!length)     /* ignore null tuples */\r
805                 return 0;\r
806 \r
807         ftap->in_tuple_cnt++;\r
808 \r
809         host_tuple temp;\r
810         temp.tuple_size = length;\r
811         temp.data = packet;\r
812         temp.channel = 0;\r
813         temp.heap_resident = false;\r
814 \r
815 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
816 \r
817         // find from which lfta the tuple came\r
818         list<lfta_info*>::iterator iter;\r
819         lfta_info* inf = NULL;\r
820         int i;\r
821 \r
822         for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {\r
823                 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&\r
824                         ftap->_fta.stream_subscribed[i].port == ftaid->port &&\r
825                         ftap->_fta.stream_subscribed[i].index == ftaid->index &&\r
826                         ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {\r
827                         inf = *iter;\r
828                         break;\r
829                 }\r
830         }\r
831 \r
832         if (!inf) {\r
833                 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");\r
834                 exit(1);\r
835         }\r
836 \r
837         // route tuple through operator chain\r
838         list<host_tuple> result;\r
839         host_tuple tup;\r
840         int ret;\r
841         #ifndef PLAN_DAG\r
842                 temp.channel = inf->output_channel;\r
843         #endif\r
844         operator_node* current_node = NULL, *child = NULL;\r
845         list<host_tuple> temp_output_queue;\r
846 \r
847 \r
848         fta_stat* tup_trace = NULL;\r
849         gs_uint32_t tup_trace_sz = 0;\r
850         gs_uint64_t trace_id = 0;\r
851         bool temp_tuple_received = false;\r
852 \r
853         // if the tuple is temporal we need to extract the heartbeat payload\r
854         if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {\r
855                 temp_tuple_received = true;\r
856                 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
857                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
858         }\r
859 \r
860         if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {\r
861 \r
862                 if (++ftap->num_eof_tuples < ftap->lfta_list->size())\r
863                         return 0;\r
864 \r
865                 ftap->num_eof_tuples = 0;\r
866 \r
867                 /* perform a flush  */\r
868                 ftap->flush();\r
869 \r
870                 /* post eof_tuple to a parent */\r
871                 host_tuple eof_tuple;\r
872                 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);\r
873 \r
874                 /* last byte of the tuple specify the tuple type\r
875                  * set it to EOF_TUPLE\r
876                 */\r
877                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
878                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
879                 ftap->out_tuple_cnt++;\r
880                 ftap->out_tuple_sz+=eof_tuple.tuple_size;\r
881 \r
882                 return 0;\r
883         }\r
884 \r
885         list<host_tuple>::iterator iter2;\r
886 \r
887 #ifdef PLAN_DAG\r
888 \r
889         // push tuple to all parent operators of the lfta\r
890         list<operator_node*>::iterator node_iter;\r
891         list<unsigned>::iterator chan_iter;\r
892         for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {\r
893                 temp.channel = *chan_iter;\r
894                 (*node_iter)->input_queue.push_back(temp);\r
895         }\r
896 \r
897         for (i = 0; i < ftap->num_operators; ++i) {\r
898 \r
899                 operator_node* node = ftap->sorted_nodes[i];\r
900                 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;\r
901 \r
902                 // consume tuples waiting in your queue\r
903                 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {\r
904                         node->op->accept_tuple(*(iter2), current_output_queue);\r
905                 }\r
906                 node->input_queue.clear();\r
907 \r
908                 // copy the tuples from output queue into input queues of all parents\r
909 \r
910                 if (!node->parent_list.empty()) {\r
911 \r
912                         // append the content of the output queue to parent input queue\r
913                         for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {\r
914 \r
915                                 int* ref_cnt = 0;\r
916                                 if (node->parent_list.size() > 1) {\r
917                                         ref_cnt = (int*)malloc(sizeof(int));\r
918                                         *ref_cnt = node->parent_list.size() - 1;\r
919                                 }\r
920 \r
921                                 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {\r
922                                         (*iter2).ref_cnt = ref_cnt;\r
923                                         (*iter2).channel = *chan_iter;\r
924                                         (*node_iter)->input_queue.push_back(*iter2);\r
925                                 }\r
926                         }\r
927                 }\r
928                 temp_output_queue.clear();\r
929         }\r
930 #else\r
931         current_node = inf->parent;\r
932 \r
933 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
934         current_node->input_queue.push_back(temp);\r
935 \r
936         do {\r
937 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);\r
938                 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;\r
939 \r
940                 // consume tuples waiting in your queue\r
941                 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {\r
942                         current_node->op->accept_tuple((*iter2),current_output_queue);\r
943                 }\r
944 //                      All consumed, delete them\r
945                 current_node->input_queue.clear();\r
946                 current_node = current_node->parent;\r
947 \r
948         } while (current_node);\r
949 #endif\r
950 \r
951 \r
952         host_tuple temp_tup;\r
953 \r
954         bool no_temp_tuple = false;\r
955 \r
956         // if we received temporal tuple, last tuple of the result must be temporal too\r
957         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
958         if (temp_tuple_received) {\r
959 \r
960                 if (result.empty()) {\r
961                         no_temp_tuple = true;\r
962 \r
963                 } else {\r
964                         fta_stat stats;\r
965                         temp_tup = result.back();\r
966                         finalize_tuple(temp_tup);\r
967 \r
968                         int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
969                         char* new_data = (char*)malloc(new_tuple_size);\r
970                         memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
971                         memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
972                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
973 \r
974 \r
975                         memset((char*)&stats, 0, sizeof(fta_stat));\r
976                         stats.ftaid = fta->ftaid;\r
977                         stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
978                         stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
979                         stats.out_tuple_sz = ftap->out_tuple_sz;\r
980                         stats.cycle_cnt = ftap->cycle_cnt;\r
981                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
982 \r
983                         // Send a hearbeat message to clearinghouse.\r
984                         fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
985 \r
986                         // reset the stats\r
987                         ftap->in_tuple_cnt = 0;\r
988                         ftap->out_tuple_cnt = 0;\r
989                         ftap->out_tuple_sz = 0;\r
990                         ftap->cycle_cnt = 0;\r
991 \r
992                         free(temp_tup.data);\r
993                         temp_tup.data = new_data;\r
994                         temp_tup.tuple_size = new_tuple_size;\r
995                         result.pop_back();\r
996                 }\r
997         }\r
998 \r
999         // go through the list of returned tuples and finalyze them\r
1000         // since we can produce multiple temporal tuples in DAG plans\r
1001         // we can drop all of them except the last one\r
1002         iter2 = result.begin();\r
1003         while(iter2 != result.end()) {\r
1004                 host_tuple tup = *iter2;\r
1005 \r
1006                 // finalize the tuple\r
1007                 if (tup.tuple_size) {\r
1008                         finalize_tuple(tup);\r
1009 \r
1010         #ifdef PLAN_DAG\r
1011                 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))\r
1012                         tup.free_tuple();\r
1013                 else\r
1014         #endif\r
1015                         {\r
1016                         ftap->output_queue.push_back(tup);\r
1017                         ftap->output_queue_mem += tup.tuple_size;\r
1018                         }\r
1019 \r
1020                 }\r
1021                 iter2++;\r
1022         }\r
1023 \r
1024         // append returned list to output_queue\r
1025         // ftap->output_queue.splice(ftap->output_queue.end(), result);\r
1026 \r
1027         if (temp_tuple_received && !no_temp_tuple) {\r
1028                 ftap->output_queue.push_back(temp_tup);\r
1029                 ftap->output_queue_mem += temp_tup.tuple_size;\r
1030         }\r
1031 \r
1032         // post tuples\r
1033         while (!ftap->output_queue.empty()) {\r
1034                 host_tuple& tup = ftap->output_queue.front();\r
1035                 #ifdef DEBUG\r
1036                         fprintf(stderr, "HFTA::about to post tuple\n");\r
1037                 #endif\r
1038                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
1039                         ftap->out_tuple_cnt++;\r
1040                         ftap->out_tuple_sz+=tup.tuple_size;\r
1041                         ftap->output_queue_mem -= tup.tuple_size;\r
1042                         tup.free_tuple();\r
1043                         ftap->output_queue.pop_front();\r
1044                 } else\r
1045                         break;\r
1046         }\r
1047 \r
1048         ftap->cycle_cnt += rdtsc() - start_cycle;\r
1049 \r
1050         return 1;\r
1051 }\r
1052 \r
1053 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {\r
1054         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
1055 \r
1056 #ifdef HFTA_PROFILE\r
1057         /* Print stats */\r
1058         fprintf(stderr, "FTA = %s|", ftap->fta_name);\r
1059         fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);\r
1060         fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);\r
1061         fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);\r
1062         fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);\r
1063 \r
1064 \r
1065                 fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());\r
1066                 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();\r
1067 \r
1068                 for (int i = 1; i < ftap->num_operators; ++i) {\r
1069                         operator_node* node = ftap->sorted_nodes[i];\r
1070                         fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());\r
1071                         total_mem += node->op->get_mem_footprint();\r
1072                 }\r
1073                 fprintf(stderr, ", total = %d|", total_mem );\r
1074                 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );\r
1075 #endif\r
1076 \r
1077         fta_stat stats;\r
1078         memset((char*)&stats, 0, sizeof(fta_stat));\r
1079         stats.ftaid = fta->ftaid;\r
1080         stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
1081         stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
1082         stats.out_tuple_sz = ftap->out_tuple_sz;\r
1083         stats.cycle_cnt = ftap->cycle_cnt;\r
1084 \r
1085         // Send a hearbeat message to clearinghouse.\r
1086         fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);\r
1087 \r
1088         // resets runtime stats\r
1089         ftap->in_tuple_cnt = 0;\r
1090         ftap->out_tuple_cnt = 0;\r
1091         ftap->out_tuple_sz = 0;\r
1092         ftap->cycle_cnt = 0;\r
1093 \r
1094         return 0;\r
1095 }\r
1096 \r
1097 \r
1098 #endif  // __HFTA_H\r
1099 \r