Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / hfta.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef __HFTA_H
17 #define __HFTA_H
18
19 #include "gstypes.h"
20 #include "host_tuple.h"
21 #include "base_operator.h"
22 #include <vector>
23 #include <map>
24 #include<math.h>
25 #include "rdtsc.h"
26 using namespace std;
27
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
29 #define LLMIN(x,y) ((x)<(y)?(x):(y))
30 #define LLMAX(x,y) ((x)<(y)?(y):(x))
31 #define UMIN(x,y) ((x)<(y)?(x):(y))
32 #define UMAX(x,y) ((x)<(y)?(y):(x))
33 #define EQ(x,y) ((x)==(y))
34 #define GEQ(x,y) ((x)>=(y))
35 #define LEQ(x,y) ((x)<=(y))
36 //      Cast away temporality
37 #define non_temporal(x)(x)
38 //      Access math libraries
39 #define sqrt(x) sqrt(x)
40
41
42 extern "C" {
43 #include <lapp.h>
44 #include <fta.h>
45 #include <stdlib.h>
46 #include <stdio.h>
47 #include <schemaparser.h>
48 }
49
50 struct param_block {
51         gs_int32_t block_length;
52         void* data;
53 };
54
55 // forward declaration of operator_node
56 struct operator_node;
57
58 struct lfta_info {
59         gs_schemahandle_t schema_handle;
60         gs_sp_t schema;
61         gs_sp_t fta_name;
62 #ifdef PLAN_DAG
63         list<operator_node*> parent_list;
64         list<unsigned> out_channel_list;
65 #else
66         operator_node* parent;
67         unsigned output_channel;
68 #endif
69         FTAID f;
70
71         lfta_info() {
72                 schema_handle = -1;
73                 schema = NULL;
74                 #ifndef PLAN_DAG
75                         parent = NULL;
76                         output_channel = 0;
77                 #endif
78         }
79
80         ~lfta_info() {
81                 if (fta_name)
82                         free (fta_name);
83                 if (schema)
84                         free (schema);
85                 if (schema_handle >= 0)
86                         ftaschema_free(schema_handle);
87         }
88 };
89
90
91 struct operator_node {
92         base_operator* op;
93
94 #ifdef PLAN_DAG
95         list<operator_node*> parent_list;
96         list<unsigned> out_channel_list;
97 #else
98         operator_node* parent;
99 #endif
100
101         operator_node* left_child;
102         operator_node* right_child;
103         lfta_info* left_lfta;
104         lfta_info* right_lfta;
105
106         list<host_tuple> input_queue;
107
108         operator_node(base_operator* o) {
109                 op = o;
110                 #ifndef PLAN_DAG
111                         parent = NULL;
112                 #endif
113                 left_child = right_child = NULL;
114                 left_lfta = right_lfta = NULL;
115         }
116
117         ~operator_node() {
118                 delete op;
119         }
120
121         void set_left_child_node(operator_node* child) {
122                 left_child = child;
123                 if (child) {
124                         #ifdef PLAN_DAG
125                                 child->parent_list.push_back(this);
126                                 child->out_channel_list.push_back(0);
127                         #else
128                                 child->parent = this;
129                                 child->op->set_output_channel(0);
130                         #endif
131                 }
132         }
133
134         void set_right_child_node(operator_node* child) {
135                 right_child = child;
136                 if (child) {
137                         #ifdef PLAN_DAG
138                                 child->parent_list.push_back(this);
139                                 child->out_channel_list.push_back(1);
140                         #else
141                                 child->parent = this;
142                                 child->op->set_output_channel(1);
143                         #endif
144                 }
145         }
146
147         void set_left_lfta(lfta_info* l_lfta) {
148                 left_lfta = l_lfta;
149                 if (left_lfta) {
150                         #ifdef PLAN_DAG
151                                 left_lfta->parent_list.push_back(this);
152                                 left_lfta->out_channel_list.push_back(0);
153                         #else
154                                 left_lfta->parent = this;
155                                 left_lfta->output_channel = 0;
156                         #endif
157                 }
158         }
159
160         void set_right_lfta(lfta_info* r_lfta) {
161                 right_lfta = r_lfta;
162                 if (right_lfta) {
163                         #ifdef PLAN_DAG
164                                 right_lfta->parent_list.push_back(this);
165                                 right_lfta->out_channel_list.push_back(1);
166                         #else
167                                 right_lfta->parent = this;
168                                 right_lfta->output_channel = 1;
169                         #endif
170                 }
171         }
172
173 };
174
175
176
177 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
178 void finalize_tuple(host_tuple &tup);
179 void finalize_tuple(host_tuple &tup);
180 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
181 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
182 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
183 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
184
185 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
186 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
187 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
188 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
189
190 struct UNOP_HFTA {
191         struct FTA _fta;
192         base_operator* oper;
193         FTAID f;
194         bool failed;
195         gs_schemahandle_t schema_handle;
196
197         list<host_tuple> output_queue;
198
199         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
200         // lfta_name and an instance of the base_operator
201         // We don't need to know the output schema as this information is already embeded
202         // in create_output_tuple method of operators' functor.
203         UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
204                 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
205
206                 failed = false;
207                 oper = op;
208                 f = child_ftaid;
209                 schema_handle = sch_handle;
210
211                 // assign streamid
212                 _fta.ftaid = ftaid;
213                 _fta.ftaid.streamid = (gs_p_t)this;
214
215 #ifdef DEBUG
216                 fprintf(stderr,"Instantiate a FTA\n");
217 #endif
218                 /* extract lfta param block from hfta param block */
219                 list<param_block> param_list;
220                 get_lfta_params(sz, value, param_list);
221                 param_block param = param_list.front();
222
223
224         gs_uint32_t reuse_flag = 2;
225         // we will try to create a new instance of child FTA only if it is parameterized
226         if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
227                 f.streamid = 0; // not interested in existing instances
228                 reuse_flag = 0;
229                 }
230
231                 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
232                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
233                         failed = true;
234                     return;
235             }
236
237                 free(param.data);
238                 // set the operator's parameters
239                 if(oper->set_param_block(sz, (void*)value)) failed = true;;
240
241
242                 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
243                 _fta.stream_subscribed_cnt = 1;
244                 _fta.stream_subscribed[0] = f;
245
246                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
247                 _fta.free_fta = UNOP_HFTA_free_fta;
248                 _fta.control_fta = UNOP_HFTA_control_fta;
249                 _fta.accept_packet = UNOP_HFTA_accept_packet;
250                 _fta.clock_fta = UNOP_HFTA_clock_fta;
251         }
252
253         ~UNOP_HFTA() {
254          delete oper;    // free operators memory
255
256      }
257
258      int flush() {
259                 list<host_tuple> res;
260                 if (!oper->flush(res)) {
261
262                         if (!res.empty()) {
263                                 // go through the list of returned tuples and finalyze them
264                                 list<host_tuple>::iterator iter = res.begin();
265                                 while (iter != res.end()) {
266                                         host_tuple& tup = *iter;
267
268                                         // finalize the tuple
269                                         if (tup.tuple_size)
270                                                 finalize_tuple(tup);
271                                         iter++;
272                                 }
273
274                                 // append returned list to output_queue
275                                 output_queue.splice(output_queue.end(), res);
276
277
278                                 // post tuples
279                                 while (!output_queue.empty()) {
280                                         host_tuple& tup = output_queue.front();
281                                         #ifdef DEBUG
282                                                 fprintf(stderr, "HFTA::about to post tuple\n");
283                                         #endif
284                                         if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
285                                                 tup.free_tuple();
286                                                 output_queue.pop_front();
287                                         } else
288                                                 break;
289                                 }
290                         }
291                 }
292
293                 return 0;
294          }
295
296         bool init_failed(){return failed;}
297 };
298
299 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
300         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor
301                                                                 // will be called on program exit
302
303         if (recursive)
304                 // free instance we are subscribed to
305                 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
306
307         delete ftap;
308         return 0;
309 }
310
311 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
312         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
313
314         if (command == FTA_COMMAND_FLUSH) {
315                 // ask lfta to do the flush
316                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
317                 ftap->flush();
318
319         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
320                 // ask lfta to do the flush
321                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
322                 ftap->flush();
323
324                 /* extract lfta param block from hfta param block */
325                 list<param_block> param_list;
326                 get_lfta_params(sz, value, param_list);
327                 param_block param = param_list.front();
328                 // load new parameters into lfta
329                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
330                 free(param.data);
331
332                 // notify the operator about the change of parameter
333                 ftap->oper->set_param_block(sz, (void*)value);
334
335         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
336                 // we no longer use temp_status commands
337                 // hearbeat mechanism is used instead
338         }
339         return 0;
340 }
341
342 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
343         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
344 #ifdef DEBUG
345         fprintf(stderr, "HFTA::accepted packet\n");
346 #endif
347         if (!length)     /* ignore null tuples */
348                 return 0;
349
350         host_tuple temp;
351         temp.tuple_size = length;
352         temp.data = packet;
353         temp.channel = 0;
354         temp.heap_resident = false;
355
356         // pass the tuple to operator
357         list<host_tuple> res;
358         int ret;
359         fta_stat* tup_trace = NULL;
360         gs_uint32_t tup_trace_sz = 0;
361         gs_uint64_t trace_id = 0;
362         bool temp_tuple_received = false;
363
364
365         // if the tuple is temporal we need to extract the hearbeat payload
366         if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
367                 temp_tuple_received = true;
368                 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
369                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
370         }
371
372         if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
373                 /* perform a flush  */
374                 ftap->flush();
375
376                 /* post eof_tuple to a parent */
377                 host_tuple eof_tuple;
378                 ftap->oper->get_temp_status(eof_tuple);
379
380                 /* last byte of the tuple specifies the tuple type
381                  * set it to EOF_TUPLE
382                 */
383                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
384                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
385
386                 return 0;
387         }
388
389         ret = ftap->oper->accept_tuple(temp, res);
390
391         // go through the list of returned tuples and finalyze them
392         list<host_tuple>::iterator iter = res.begin();
393         while (iter != res.end()) {
394                 host_tuple& tup = *iter;
395
396                 // finalize the tuple
397                 if (tup.tuple_size)
398                         finalize_tuple(tup);
399                 iter++;
400         }
401
402         // if we received temporal tuple, last tuple of the result must be temporal too
403         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
404         if (temp_tuple_received) {
405                 fta_stat stats;
406                 host_tuple& temp_tup = res.back();
407
408
409                 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
410                 char* new_data = (char*)malloc(new_tuple_size);
411                 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
412                 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
413                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
414                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
415
416                 memset((char*)&stats, 0, sizeof(fta_stat));
417                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
418
419                 // Send a hearbeat message to clearinghouse.
420                 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
421
422                 temp_tup.free_tuple();
423                 temp_tup.data = new_data;
424                 temp_tup.tuple_size = new_tuple_size;
425         }
426
427         // append returned list to output_queue
428         ftap->output_queue.splice(ftap->output_queue.end(), res);
429
430         // post tuples
431         while (!ftap->output_queue.empty()) {
432                 host_tuple& tup = ftap->output_queue.front();
433                 #ifdef DEBUG
434                         fprintf(stderr, "HFTA::about to post tuple\n");
435                 #endif
436                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
437                         tup.free_tuple();
438                         ftap->output_queue.pop_front();
439                 } else
440                         break;
441         }
442
443         return 1;
444 }
445
446 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
447
448         // Send a hearbeat message to clearinghouse.to indicate we are alive
449         fta_heartbeat(ftap->ftaid, 0, 0, 0);
450
451         return 0;
452 }
453
454
455 struct MULTOP_HFTA {
456         struct FTA _fta;
457         gs_csp_t fta_name;
458         gs_schemahandle_t schema_handle;
459         operator_node* root;
460         vector<operator_node*> sorted_nodes;
461         int num_operators;
462         list<lfta_info*> *lfta_list;
463         /* number of eof tuples we received so far
464          * receiving eof tuples from every source fta will cause a flush
465         */
466         int num_eof_tuples;
467
468         bool failed;
469         bool reusable;
470
471         list<host_tuple> output_queue;
472
473         // Runtime stats
474         gs_uint32_t in_tuple_cnt;
475         gs_uint32_t out_tuple_cnt;
476         gs_uint32_t out_tuple_sz;
477         gs_uint64_t cycle_cnt;
478
479         gs_uint64_t trace_id;
480
481         // memory occupied by output queue
482         gs_uint32_t output_queue_mem;
483
484
485         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
486         // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
487         // as the schema handle is already passed during operator creation time.
488         // We also don't need to know the output schema as this information is already embeded
489         // in create_output_tuple method of operators' functor.
490
491
492
493         MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
494                 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
495
496                 fta_name = name;
497                 failed = false;
498
499                 root = node;
500                 lfta_list = lftas;
501
502                 // assign streamid
503                 _fta.ftaid = ftaid;
504                 _fta.ftaid.streamid = (gs_p_t)this;
505
506                 schema_handle = sch_handle;
507
508                 output_queue_mem = 0;
509
510                 // topologically sort the operators in the tree (or DAG)
511                 // for DAG we make sure we add the node to the sorted list only once
512                 operator_node* current_node;
513                 map<operator_node*, int> node_map;
514                 vector<operator_node*> node_list;
515
516                 int i = 0;
517                 node_list.push_back(root);
518                 node_map[root] = 0;
519
520                 num_operators = 1;
521
522                 while (i < node_list.size()) {
523                         current_node = node_list[i];
524                         if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
525                                 node_map[current_node->left_child] = num_operators++;
526                                 node_list.push_back(current_node->left_child);
527                         }
528                         if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
529                                 node_map[current_node->right_child] = num_operators++;
530                                 node_list.push_back(current_node->right_child);
531                         }
532                         i++;
533                 }
534                 num_operators = i;
535
536                 // build adjacency lists for query DAG
537                 list<int>* adj_lists = new list<int>[num_operators];
538                 bool* leaf_flags = new bool[num_operators];
539                 memset(leaf_flags, 0, num_operators * sizeof(bool));
540                 for (i = 0; i < num_operators; ++i) {
541                         current_node = node_list[i];
542                         if (current_node->left_child) {
543                                 adj_lists[i].push_back(node_map[current_node->left_child]);
544                         }
545                         if (current_node->right_child && current_node->left_child != current_node->right_child) {
546                                 adj_lists[i].push_back(node_map[current_node->right_child]);
547                         }
548                 }
549
550                 // run topolofical sort
551                 bool leaf_found = true;
552                 while (leaf_found) {
553                         leaf_found = false;
554                         // add all leafs to sorted_nodes
555                         for (i = 0; i < num_operators; ++i) {
556                                 if (!leaf_flags[i] && adj_lists[i].empty()) {
557                                         leaf_flags[i] = true;
558                                         sorted_nodes.push_back(node_list[i]);
559                                         leaf_found = true;
560
561                                         // remove the node from its parents adjecency lists
562                                         for (int j = 0; j < num_operators; ++j) {
563                                                 list<int>::iterator iter;
564                                                 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
565                                                         if (*iter == i) {
566                                                                 adj_lists[j].erase(iter);
567                                                                 break;
568                                                         }
569                                                 }
570                                         }
571                                 }
572                         }
573                 }
574
575                 delete[] adj_lists;
576                 delete[] leaf_flags;
577
578                 // set the parameter block for every operator in tree
579                 for (i = 0; i < num_operators; ++i)
580                         if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
581
582 #ifdef DEBUG
583                 fprintf(stderr,"Instantiate FTAs\n");
584 #endif
585                 /* extract lfta param block from hfta param block */
586 //                      NOTE: param_list must line up with lfta_list
587                 list<param_block> param_list;
588                 get_lfta_params(sz, value, param_list);
589                 list<param_block>::iterator iter1;
590                 list<lfta_info*>::iterator iter2 = lfta_list->begin();
591
592                 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
593                         lfta_info* inf = *iter2;
594
595                 #ifdef DEBUG
596                         fprintf(stderr,"Instantiate a FTA\n");
597                 #endif
598
599                         gs_uint32_t reuse_flag = 2;
600
601                         // we will try to create a new instance of child FTA only if it is parameterized
602                         if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
603                                 (*iter2)->f.streamid = 0;       // not interested in existing instances
604                                 reuse_flag = 0;
605                         }
606                         if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
607                                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
608                                 failed = true;
609                                 return;
610                         }
611
612                         free((*iter1).data);
613
614                         //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
615
616                         _fta.stream_subscribed[i]=(*iter2)->f;
617                 }
618                 _fta.stream_subscribed_cnt = i;
619
620                 num_eof_tuples = 0;
621
622                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
623                 _fta.free_fta = MULTOP_HFTA_free_fta;
624                 _fta.control_fta = MULTOP_HFTA_control_fta;
625                 _fta.accept_packet = MULTOP_HFTA_accept_packet;
626                 _fta.clock_fta = MULTOP_HFTA_clock_fta;
627
628                 // init runtime stats
629                 in_tuple_cnt = 0;
630                 out_tuple_cnt = 0;
631                 out_tuple_sz = 0;
632                 cycle_cnt = 0;
633
634         }
635
636         ~MULTOP_HFTA() {
637
638                 list<lfta_info*>::iterator iter;
639                 int i = 0;
640
641                 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
642                 delete *iter;
643                 }
644
645                 delete root;    // free operators memory
646                 delete lfta_list;
647
648
649      }
650
651         int flush() {
652
653                 list<host_tuple> res;
654
655                 // go through the list of operators in topological order
656                 // and flush them
657                 list<host_tuple>::iterator iter;
658                 list<host_tuple> temp_output_queue;
659
660                 for (int i = 0; i < num_operators; ++i) {
661                         operator_node* node = sorted_nodes[i];
662
663 #ifdef PLAN_DAG
664                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
665 #else
666                         // for trees we can put output tuples directly into parent's input buffer
667                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
668 #endif
669                         // consume tuples waiting in your queue
670                         for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
671                                 node->op->accept_tuple(*(iter), current_output_queue);
672                         }
673                         node->op->flush(current_output_queue);
674                         node->input_queue.clear();
675
676 #ifdef PLAN_DAG
677                         // copy the tuples from output queue into input queues of all parents
678                         list<operator_node*>::iterator node_iter;
679
680                         if (!node->parent_list.empty()) {
681                                 // append the content of the output queue to parent input queue
682
683                                 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
684                                         int* ref_cnt = 0;
685                                         if (node->parent_list.size() > 1) {
686                                                 ref_cnt = (int*)malloc(sizeof(int));
687                                                 *ref_cnt = node->parent_list.size() - 1;
688                                         }
689
690                                         for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
691                                                 (*iter).ref_cnt = ref_cnt;
692                                                 (*node_iter)->input_queue.push_back(*iter);
693                                         }
694                                 }
695                         }
696 #endif
697                 }
698
699                 if (!res.empty()) {
700                         // go through the list of returned tuples and finalyze them
701                         list<host_tuple>::iterator iter = res.begin();
702                         while (iter != res.end()) {
703                                 host_tuple& tup = *iter;
704
705                                 // finalize the tuple
706                                 if (tup.tuple_size)
707                                         finalize_tuple(tup);
708
709                                 output_queue_mem += tup.tuple_size;
710                                 iter++;
711                         }
712
713                         // append returned list to output_queue
714                         output_queue.splice(output_queue.end(), res);
715
716
717                         // post tuples
718                         while (!output_queue.empty()) {
719                                 host_tuple& tup = output_queue.front();
720                                 #ifdef DEBUG
721                                         fprintf(stderr, "HFTA::about to post tuple\n");
722                                 #endif
723                                 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
724                                         output_queue_mem -= tup.tuple_size;
725                                         tup.free_tuple();
726                                         output_queue.pop_front();
727                                 } else
728                                         break;
729                         }
730                 }
731
732                 return 0;
733         }
734
735         bool init_failed(){return failed;}
736 };
737
738
739 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
740         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor
741                                                                 // will be called on program exit
742
743         if (recursive) {
744                 // free instance we are subscribed to
745                 list<lfta_info*>::iterator iter;
746                 int i = 0;
747
748                 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
749                         fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
750                 }
751         }
752
753         delete ftap;
754         return 0;
755 }
756
757 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
758         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
759
760         if (command == FTA_COMMAND_FLUSH) {
761
762                 // ask lftas to do the flush
763                 list<lfta_info*>::iterator iter;
764                 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
765                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
766                 }
767                 // flush hfta operators
768                 ftap->flush();
769
770         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
771
772                 list<param_block> param_list;
773                 get_lfta_params(sz, value, param_list);
774
775                 // ask lftas to do the flush and set new parameters
776                 list<lfta_info*>::iterator iter;
777                 list<param_block>::iterator iter2;
778                 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
779                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
780                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
781                         free((*iter2).data);
782                 }
783                 // flush hfta operators
784                 ftap->flush();
785
786                 // set the new parameter block for every operator in tree
787                 for (int i = 0; i < ftap->num_operators; ++i)
788                         ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
789
790         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
791                 // we no longer use temp_status commands
792                 // hearbeat mechanism is used instead
793         }
794         return 0;
795 }
796
797 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
798         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
799
800         gs_uint64_t start_cycle = rdtsc();
801 #ifdef DEBUG
802         fprintf(stderr, "HFTA::accepted packet\n");
803 #endif
804         if (!length)     /* ignore null tuples */
805                 return 0;
806
807         ftap->in_tuple_cnt++;
808
809         host_tuple temp;
810         temp.tuple_size = length;
811         temp.data = packet;
812         temp.channel = 0;
813         temp.heap_resident = false;
814
815 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
816
817         // find from which lfta the tuple came
818         list<lfta_info*>::iterator iter;
819         lfta_info* inf = NULL;
820         int i;
821
822         for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
823                 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&
824                         ftap->_fta.stream_subscribed[i].port == ftaid->port &&
825                         ftap->_fta.stream_subscribed[i].index == ftaid->index &&
826                         ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
827                         inf = *iter;
828                         break;
829                 }
830         }
831
832         if (!inf) {
833                 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
834                 exit(1);
835         }
836
837         // route tuple through operator chain
838         list<host_tuple> result;
839         host_tuple tup;
840         int ret;
841         #ifndef PLAN_DAG
842                 temp.channel = inf->output_channel;
843         #endif
844         operator_node* current_node = NULL, *child = NULL;
845         list<host_tuple> temp_output_queue;
846
847
848         fta_stat* tup_trace = NULL;
849         gs_uint32_t tup_trace_sz = 0;
850         gs_uint64_t trace_id = 0;
851         bool temp_tuple_received = false;
852
853         // if the tuple is temporal we need to extract the heartbeat payload
854         if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
855                 temp_tuple_received = true;
856                 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
857                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
858         }
859
860         if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
861
862                 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
863                         return 0;
864
865                 ftap->num_eof_tuples = 0;
866
867                 /* perform a flush  */
868                 ftap->flush();
869
870                 /* post eof_tuple to a parent */
871                 host_tuple eof_tuple;
872                 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
873
874                 /* last byte of the tuple specify the tuple type
875                  * set it to EOF_TUPLE
876                 */
877                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
878                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
879                 ftap->out_tuple_cnt++;
880                 ftap->out_tuple_sz+=eof_tuple.tuple_size;
881
882                 return 0;
883         }
884
885         list<host_tuple>::iterator iter2;
886
887 #ifdef PLAN_DAG
888
889         // push tuple to all parent operators of the lfta
890         list<operator_node*>::iterator node_iter;
891         list<unsigned>::iterator chan_iter;
892         for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
893                 temp.channel = *chan_iter;
894                 (*node_iter)->input_queue.push_back(temp);
895         }
896
897         for (i = 0; i < ftap->num_operators; ++i) {
898
899                 operator_node* node = ftap->sorted_nodes[i];
900                 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
901
902                 // consume tuples waiting in your queue
903                 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
904                         node->op->accept_tuple(*(iter2), current_output_queue);
905                 }
906                 node->input_queue.clear();
907
908                 // copy the tuples from output queue into input queues of all parents
909
910                 if (!node->parent_list.empty()) {
911
912                         // append the content of the output queue to parent input queue
913                         for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
914
915                                 int* ref_cnt = 0;
916                                 if (node->parent_list.size() > 1) {
917                                         ref_cnt = (int*)malloc(sizeof(int));
918                                         *ref_cnt = node->parent_list.size() - 1;
919                                 }
920
921                                 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
922                                         (*iter2).ref_cnt = ref_cnt;
923                                         (*iter2).channel = *chan_iter;
924                                         (*node_iter)->input_queue.push_back(*iter2);
925                                 }
926                         }
927                 }
928                 temp_output_queue.clear();
929         }
930 #else
931         current_node = inf->parent;
932
933 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
934         current_node->input_queue.push_back(temp);
935
936         do {
937 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
938                 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
939
940                 // consume tuples waiting in your queue
941                 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
942                         current_node->op->accept_tuple((*iter2),current_output_queue);
943                 }
944 //                      All consumed, delete them
945                 current_node->input_queue.clear();
946                 current_node = current_node->parent;
947
948         } while (current_node);
949 #endif
950
951
952         host_tuple temp_tup;
953
954         bool no_temp_tuple = false;
955
956         // if we received temporal tuple, last tuple of the result must be temporal too
957         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
958         if (temp_tuple_received) {
959
960                 if (result.empty()) {
961                         no_temp_tuple = true;
962
963                 } else {
964                         fta_stat stats;
965                         temp_tup = result.back();
966                         finalize_tuple(temp_tup);
967
968                         int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
969                         char* new_data = (char*)malloc(new_tuple_size);
970                         memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
971                         memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
972                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
973
974
975                         memset((char*)&stats, 0, sizeof(fta_stat));
976                         stats.ftaid = fta->ftaid;
977                         stats.in_tuple_cnt = ftap->in_tuple_cnt;
978                         stats.out_tuple_cnt = ftap->out_tuple_cnt;
979                         stats.out_tuple_sz = ftap->out_tuple_sz;
980                         stats.cycle_cnt = ftap->cycle_cnt;
981                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
982
983                         // Send a hearbeat message to clearinghouse.
984                         fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
985
986                         // reset the stats
987                         ftap->in_tuple_cnt = 0;
988                         ftap->out_tuple_cnt = 0;
989                         ftap->out_tuple_sz = 0;
990                         ftap->cycle_cnt = 0;
991
992                         free(temp_tup.data);
993                         temp_tup.data = new_data;
994                         temp_tup.tuple_size = new_tuple_size;
995                         result.pop_back();
996                 }
997         }
998
999         // go through the list of returned tuples and finalyze them
1000         // since we can produce multiple temporal tuples in DAG plans
1001         // we can drop all of them except the last one
1002         iter2 = result.begin();
1003         while(iter2 != result.end()) {
1004                 host_tuple tup = *iter2;
1005
1006                 // finalize the tuple
1007                 if (tup.tuple_size) {
1008                         finalize_tuple(tup);
1009
1010         #ifdef PLAN_DAG
1011                 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1012                         tup.free_tuple();
1013                 else
1014         #endif
1015                         {
1016                         ftap->output_queue.push_back(tup);
1017                         ftap->output_queue_mem += tup.tuple_size;
1018                         }
1019
1020                 }
1021                 iter2++;
1022         }
1023
1024         // append returned list to output_queue
1025         // ftap->output_queue.splice(ftap->output_queue.end(), result);
1026
1027         if (temp_tuple_received && !no_temp_tuple) {
1028                 ftap->output_queue.push_back(temp_tup);
1029                 ftap->output_queue_mem += temp_tup.tuple_size;
1030         }
1031
1032         // post tuples
1033         while (!ftap->output_queue.empty()) {
1034                 host_tuple& tup = ftap->output_queue.front();
1035                 #ifdef DEBUG
1036                         fprintf(stderr, "HFTA::about to post tuple\n");
1037                 #endif
1038                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1039                         ftap->out_tuple_cnt++;
1040                         ftap->out_tuple_sz+=tup.tuple_size;
1041                         ftap->output_queue_mem -= tup.tuple_size;
1042                         tup.free_tuple();
1043                         ftap->output_queue.pop_front();
1044                 } else
1045                         break;
1046         }
1047
1048         ftap->cycle_cnt += rdtsc() - start_cycle;
1049
1050         return 1;
1051 }
1052
1053 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1054         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1055
1056 #ifdef HFTA_PROFILE
1057         /* Print stats */
1058         fprintf(stderr, "FTA = %s|", ftap->fta_name);
1059         fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1060         fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1061         fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1062         fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1063
1064
1065                 fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1066                 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1067
1068                 for (int i = 1; i < ftap->num_operators; ++i) {
1069                         operator_node* node = ftap->sorted_nodes[i];
1070                         fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1071                         total_mem += node->op->get_mem_footprint();
1072                 }
1073                 fprintf(stderr, ", total = %d|", total_mem );
1074                 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1075 #endif
1076
1077         fta_stat stats;
1078         memset((char*)&stats, 0, sizeof(fta_stat));
1079         stats.ftaid = fta->ftaid;
1080         stats.in_tuple_cnt = ftap->in_tuple_cnt;
1081         stats.out_tuple_cnt = ftap->out_tuple_cnt;
1082         stats.out_tuple_sz = ftap->out_tuple_sz;
1083         stats.cycle_cnt = ftap->cycle_cnt;
1084
1085         // Send a hearbeat message to clearinghouse.
1086         fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1087
1088         // resets runtime stats
1089         ftap->in_tuple_cnt = 0;
1090         ftap->out_tuple_cnt = 0;
1091         ftap->out_tuple_sz = 0;
1092         ftap->cycle_cnt = 0;
1093
1094         return 0;
1095 }
1096
1097
1098 #endif  // __HFTA_H
1099