f9e3c87b9fdd50c6a652d700968d04a30f223959
[com/gs-lite.git] / include / hfta / hfta.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef __HFTA_H
17 #define __HFTA_H
18
19 #include "gstypes.h"
20 #include "host_tuple.h"
21 #include "base_operator.h"
22 #include <vector>
23 #include <map>
24 #include<math.h>
25 #include "rdtsc.h"
26 using namespace std;
27
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
29
30 // min, max
31 #define ULLMIN(x,y) (unsigned long long)(((x)<(y)?(x):(y)))
32 #define ULLMAX(x,y) (unsigned long long)(((x)<(y)?(y):(x)))
33 #define LLMIN(x,y) (long long int)(((x)<(y)?(x):(y)))
34 #define LLMAX(x,y) (long long int)(((x)<(y)?(y):(x)))
35 #define UMIN(x,y) (unsigned int)(((x)<(y)?(x):(y)))
36 #define UMAX(x,y) (unsigned int)(((x)<(y)?(y):(x)))
37 #define LMIN(x,y) (int)(((x)<(y)?(x):(y)))
38 #define LMAX(x,y) (int)(((x)<(y)?(y):(x)))
39 #define FMIN(x,y) (double)(((x)<(y)?(x):(y)))
40 #define FMAX(x,y) (double)(((x)<(y)?(y):(x)))
41
42 // comparison
43 #define EQ(x,y) ((x)==(y))
44 #define GEQ(x,y) ((x)>=(y))
45 #define GE(x,y) ((x)>(y))
46 #define LEQ(x,y) ((x)<=(y))
47 #define LE(x,y) ((x)<(y))
48
49 // if_else
50 #define if_else_f(x,y,z) (double)(((x)==0?(z):(y)))
51 #define if_else_ll(x,y,z) (long long int)(((x)==0?(z):(y)))
52 #define if_else_ul(x,y,z) (unsigned long long)(((x)==0?(z):(y)))
53 #define if_else_u(x,y,z) (unsigned int)(((x)==0?(z):(y)))
54 #define if_else_i(x,y,z) (int)(((x)==0?(z):(y)))
55
56 //      Cast away temporality
57 #define non_temporal(x)(x)
58
59 //      endian swap
60 #define endian_swap_ui(x) ( (( (x) & 0xFF000000) >> 24) | (( (x) & 0x00FF0000) >> 8) | (( (x) & 0x0000FF00) << 8) | (( (x) & 0x000000FF) << 24) ) 
61
62 //      Access math libraries
63 #define sqrt(x) sqrt(x)
64 #define pow(x,y) pow((x),(y))
65 #define sin(x) sin(x)
66 #define cos(x) cos(x)
67 #define tan(x) tan(x)
68 #define asin(x) asin(x)
69 #define acos(x) acos(x)
70 #define atan(x) atan(x)
71 #define log(x) log(x)
72 #define log2(x) log2(x)
73 #define log10(x) log10(x)
74 #define ceil(x) ceil(x)
75 #define floor(x) floor(x)
76 #define fmod(x) fmod(x)
77 #define trunc(x) trunc(x)
78
79
80 extern "C" {
81 #include <lapp.h>
82 #include <fta.h>
83 #include <stdlib.h>
84 #include <stdio.h>
85 #include <schemaparser.h>
86 }
87
88 struct param_block {
89         gs_int32_t block_length;
90         void* data;
91 };
92
93 // forward declaration of operator_node
94 struct operator_node;
95
96 struct lfta_info {
97         gs_schemahandle_t schema_handle;
98         gs_sp_t schema;
99         gs_sp_t fta_name;
100 #ifdef PLAN_DAG
101         list<operator_node*> parent_list;
102         list<unsigned> out_channel_list;
103 #else
104         operator_node* parent;
105         unsigned output_channel;
106 #endif
107         FTAID f;
108
109         lfta_info() {
110                 schema_handle = -1;
111                 schema = NULL;
112                 #ifndef PLAN_DAG
113                         parent = NULL;
114                         output_channel = 0;
115                 #endif
116         }
117
118         ~lfta_info() {
119                 if (fta_name)
120                         free (fta_name);
121                 if (schema)
122                         free (schema);
123                 if (schema_handle >= 0)
124                         ftaschema_free(schema_handle);
125         }
126 };
127
128
129 struct operator_node {
130         base_operator* op;
131
132 #ifdef PLAN_DAG
133         list<operator_node*> parent_list;
134         list<unsigned> out_channel_list;
135 #else
136         operator_node* parent;
137 #endif
138
139         operator_node* left_child;
140         operator_node* right_child;
141         lfta_info* left_lfta;
142         lfta_info* right_lfta;
143
144         list<host_tuple> input_queue;
145
146         operator_node(base_operator* o) {
147                 op = o;
148                 #ifndef PLAN_DAG
149                         parent = NULL;
150                 #endif
151                 left_child = right_child = NULL;
152                 left_lfta = right_lfta = NULL;
153         }
154
155         ~operator_node() {
156                 delete op;
157         }
158
159         void set_left_child_node(operator_node* child) {
160                 left_child = child;
161                 if (child) {
162                         #ifdef PLAN_DAG
163                                 child->parent_list.push_back(this);
164                                 child->out_channel_list.push_back(0);
165                         #else
166                                 child->parent = this;
167                                 child->op->set_output_channel(0);
168                         #endif
169                 }
170         }
171
172         void set_right_child_node(operator_node* child) {
173                 right_child = child;
174                 if (child) {
175                         #ifdef PLAN_DAG
176                                 child->parent_list.push_back(this);
177                                 child->out_channel_list.push_back(1);
178                         #else
179                                 child->parent = this;
180                                 child->op->set_output_channel(1);
181                         #endif
182                 }
183         }
184
185         void set_left_lfta(lfta_info* l_lfta) {
186                 left_lfta = l_lfta;
187                 if (left_lfta) {
188                         #ifdef PLAN_DAG
189                                 left_lfta->parent_list.push_back(this);
190                                 left_lfta->out_channel_list.push_back(0);
191                         #else
192                                 left_lfta->parent = this;
193                                 left_lfta->output_channel = 0;
194                         #endif
195                 }
196         }
197
198         void set_right_lfta(lfta_info* r_lfta) {
199                 right_lfta = r_lfta;
200                 if (right_lfta) {
201                         #ifdef PLAN_DAG
202                                 right_lfta->parent_list.push_back(this);
203                                 right_lfta->out_channel_list.push_back(1);
204                         #else
205                                 right_lfta->parent = this;
206                                 right_lfta->output_channel = 1;
207                         #endif
208                 }
209         }
210
211 };
212
213
214
215 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
216 void finalize_tuple(host_tuple &tup);
217 void finalize_tuple(host_tuple &tup);
218 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
219 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
220 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
221 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
222
223 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
224 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
225 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
226 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
227
228 struct UNOP_HFTA {
229         struct FTA _fta;
230         base_operator* oper;
231         FTAID f;
232         bool failed;
233         gs_schemahandle_t schema_handle;
234
235         list<host_tuple> output_queue;
236
237         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
238         // lfta_name and an instance of the base_operator
239         // We don't need to know the output schema as this information is already embeded
240         // in create_output_tuple method of operators' functor.
241         UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
242                 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
243
244                 failed = false;
245                 oper = op;
246                 f = child_ftaid;
247                 schema_handle = sch_handle;
248
249                 // assign streamid
250                 _fta.ftaid = ftaid;
251                 _fta.ftaid.streamid = (gs_p_t)this;
252
253 #ifdef DEBUG
254                 fprintf(stderr,"Instantiate a FTA\n");
255 #endif
256                 /* extract lfta param block from hfta param block */
257                 list<param_block> param_list;
258                 get_lfta_params(sz, value, param_list);
259                 param_block param = param_list.front();
260
261
262         gs_uint32_t reuse_flag = 2;
263         // we will try to create a new instance of child FTA only if it is parameterized
264         if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
265                 f.streamid = 0; // not interested in existing instances
266                 reuse_flag = 0;
267                 }
268
269                 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
270                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
271                         failed = true;
272                     return;
273             }
274
275                 free(param.data);
276                 // set the operator's parameters
277                 if(oper->set_param_block(sz, (void*)value)) failed = true;;
278
279
280                 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
281                 _fta.stream_subscribed_cnt = 1;
282                 _fta.stream_subscribed[0] = f;
283
284                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
285                 _fta.free_fta = UNOP_HFTA_free_fta;
286                 _fta.control_fta = UNOP_HFTA_control_fta;
287                 _fta.accept_packet = UNOP_HFTA_accept_packet;
288                 _fta.clock_fta = UNOP_HFTA_clock_fta;
289         }
290
291         ~UNOP_HFTA() {
292          delete oper;    // free operators memory
293
294      }
295
296      int flush() {
297                 list<host_tuple> res;
298                 if (!oper->flush(res)) {
299
300                         if (!res.empty()) {
301                                 // go through the list of returned tuples and finalyze them
302                                 list<host_tuple>::iterator iter = res.begin();
303                                 while (iter != res.end()) {
304                                         host_tuple& tup = *iter;
305
306                                         // finalize the tuple
307                                         if (tup.tuple_size)
308                                                 finalize_tuple(tup);
309                                         iter++;
310                                 }
311
312                                 // append returned list to output_queue
313                                 output_queue.splice(output_queue.end(), res);
314
315
316                                 // post tuples
317                                 while (!output_queue.empty()) {
318                                         host_tuple& tup = output_queue.front();
319                                         #ifdef DEBUG
320                                                 fprintf(stderr, "HFTA::about to post tuple\n");
321                                         #endif
322                                         if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
323                                                 tup.free_tuple();
324                                                 output_queue.pop_front();
325                                         } else
326                                                 break;
327                                 }
328                         }
329                 }
330
331                 return 0;
332          }
333
334         bool init_failed(){return failed;}
335 };
336
337 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
338         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor
339                                                                 // will be called on program exit
340
341         if (recursive)
342                 // free instance we are subscribed to
343                 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
344
345         delete ftap;
346         return 0;
347 }
348
349 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
350         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
351
352         if (command == FTA_COMMAND_FLUSH) {
353                 // ask lfta to do the flush
354                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
355                 ftap->flush();
356
357         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
358                 // ask lfta to do the flush
359                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
360                 ftap->flush();
361
362                 /* extract lfta param block from hfta param block */
363                 list<param_block> param_list;
364                 get_lfta_params(sz, value, param_list);
365                 param_block param = param_list.front();
366                 // load new parameters into lfta
367                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
368                 free(param.data);
369
370                 // notify the operator about the change of parameter
371                 ftap->oper->set_param_block(sz, (void*)value);
372
373         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
374                 // we no longer use temp_status commands
375                 // hearbeat mechanism is used instead
376         }
377         return 0;
378 }
379
380 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
381         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
382 #ifdef DEBUG
383         fprintf(stderr, "HFTA::accepted packet\n");
384 #endif
385         if (!length)     /* ignore null tuples */
386                 return 0;
387
388         host_tuple temp;
389         temp.tuple_size = length;
390         temp.data = packet;
391         temp.channel = 0;
392         temp.heap_resident = false;
393
394         // pass the tuple to operator
395         list<host_tuple> res;
396         int ret;
397         fta_stat* tup_trace = NULL;
398         gs_uint32_t tup_trace_sz = 0;
399         gs_uint64_t trace_id = 0;
400         bool temp_tuple_received = false;
401
402
403         // if the tuple is temporal we need to extract the hearbeat payload
404         if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
405                 temp_tuple_received = true;
406                 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
407                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
408         }
409
410         if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
411                 /* perform a flush  */
412                 ftap->flush();
413
414                 /* post eof_tuple to a parent */
415                 host_tuple eof_tuple;
416                 ftap->oper->get_temp_status(eof_tuple);
417
418                 /* last byte of the tuple specifies the tuple type
419                  * set it to EOF_TUPLE
420                 */
421                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
422                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
423
424                 return 0;
425         }
426
427         ret = ftap->oper->accept_tuple(temp, res);
428
429         // go through the list of returned tuples and finalyze them
430         list<host_tuple>::iterator iter = res.begin();
431         while (iter != res.end()) {
432                 host_tuple& tup = *iter;
433
434                 // finalize the tuple
435                 if (tup.tuple_size)
436                         finalize_tuple(tup);
437                 iter++;
438         }
439
440         // if we received temporal tuple, last tuple of the result must be temporal too
441         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
442         if (temp_tuple_received) {
443                 fta_stat stats;
444                 host_tuple& temp_tup = res.back();
445
446
447                 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
448                 char* new_data = (char*)malloc(new_tuple_size);
449                 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
450                 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
451                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
452                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
453
454                 memset((char*)&stats, 0, sizeof(fta_stat));
455                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
456
457                 // Send a hearbeat message to clearinghouse.
458                 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
459
460                 temp_tup.free_tuple();
461                 temp_tup.data = new_data;
462                 temp_tup.tuple_size = new_tuple_size;
463         }
464
465         // append returned list to output_queue
466         ftap->output_queue.splice(ftap->output_queue.end(), res);
467
468         // post tuples
469         while (!ftap->output_queue.empty()) {
470                 host_tuple& tup = ftap->output_queue.front();
471                 #ifdef DEBUG
472                         fprintf(stderr, "HFTA::about to post tuple\n");
473                 #endif
474                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
475                         tup.free_tuple();
476                         ftap->output_queue.pop_front();
477                 } else
478                         break;
479         }
480
481         return 1;
482 }
483
484 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
485
486         // Send a hearbeat message to clearinghouse.to indicate we are alive
487         fta_heartbeat(ftap->ftaid, 0, 0, 0);
488
489         return 0;
490 }
491
492
493 struct MULTOP_HFTA {
494         struct FTA _fta;
495         gs_csp_t fta_name;
496         gs_schemahandle_t schema_handle;
497         operator_node* root;
498         vector<operator_node*> sorted_nodes;
499         int num_operators;
500         list<lfta_info*> *lfta_list;
501         /* number of eof tuples we received so far
502          * receiving eof tuples from every source fta will cause a flush
503         */
504         int num_eof_tuples;
505
506         bool failed;
507         bool reusable;
508
509         list<host_tuple> output_queue;
510
511         // Runtime stats
512         gs_uint32_t in_tuple_cnt;
513         gs_uint32_t out_tuple_cnt;
514         gs_uint32_t out_tuple_sz;
515         gs_uint64_t cycle_cnt;
516
517         gs_uint64_t trace_id;
518
519         // memory occupied by output queue
520         gs_uint32_t output_queue_mem;
521
522
523         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
524         // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
525         // as the schema handle is already passed during operator creation time.
526         // We also don't need to know the output schema as this information is already embeded
527         // in create_output_tuple method of operators' functor.
528
529
530
531         MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
532                 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
533
534                 fta_name = name;
535                 failed = false;
536
537                 root = node;
538                 lfta_list = lftas;
539
540                 // assign streamid
541                 _fta.ftaid = ftaid;
542                 _fta.ftaid.streamid = (gs_p_t)this;
543
544                 schema_handle = sch_handle;
545
546                 output_queue_mem = 0;
547
548                 // topologically sort the operators in the tree (or DAG)
549                 // for DAG we make sure we add the node to the sorted list only once
550                 operator_node* current_node;
551                 map<operator_node*, int> node_map;
552                 vector<operator_node*> node_list;
553
554                 int i = 0;
555                 node_list.push_back(root);
556                 node_map[root] = 0;
557
558                 num_operators = 1;
559
560                 while (i < node_list.size()) {
561                         current_node = node_list[i];
562                         if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
563                                 node_map[current_node->left_child] = num_operators++;
564                                 node_list.push_back(current_node->left_child);
565                         }
566                         if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
567                                 node_map[current_node->right_child] = num_operators++;
568                                 node_list.push_back(current_node->right_child);
569                         }
570                         i++;
571                 }
572                 num_operators = i;
573
574                 // build adjacency lists for query DAG
575                 list<int>* adj_lists = new list<int>[num_operators];
576                 bool* leaf_flags = new bool[num_operators];
577                 memset(leaf_flags, 0, num_operators * sizeof(bool));
578                 for (i = 0; i < num_operators; ++i) {
579                         current_node = node_list[i];
580                         if (current_node->left_child) {
581                                 adj_lists[i].push_back(node_map[current_node->left_child]);
582                         }
583                         if (current_node->right_child && current_node->left_child != current_node->right_child) {
584                                 adj_lists[i].push_back(node_map[current_node->right_child]);
585                         }
586                 }
587
588                 // run topolofical sort
589                 bool leaf_found = true;
590                 while (leaf_found) {
591                         leaf_found = false;
592                         // add all leafs to sorted_nodes
593                         for (i = 0; i < num_operators; ++i) {
594                                 if (!leaf_flags[i] && adj_lists[i].empty()) {
595                                         leaf_flags[i] = true;
596                                         sorted_nodes.push_back(node_list[i]);
597                                         leaf_found = true;
598
599                                         // remove the node from its parents adjecency lists
600                                         for (int j = 0; j < num_operators; ++j) {
601                                                 list<int>::iterator iter;
602                                                 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
603                                                         if (*iter == i) {
604                                                                 adj_lists[j].erase(iter);
605                                                                 break;
606                                                         }
607                                                 }
608                                         }
609                                 }
610                         }
611                 }
612
613                 delete[] adj_lists;
614                 delete[] leaf_flags;
615
616                 // set the parameter block for every operator in tree
617                 for (i = 0; i < num_operators; ++i)
618                         if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
619
620 #ifdef DEBUG
621                 fprintf(stderr,"Instantiate FTAs\n");
622 #endif
623                 /* extract lfta param block from hfta param block */
624 //                      NOTE: param_list must line up with lfta_list
625                 list<param_block> param_list;
626                 get_lfta_params(sz, value, param_list);
627                 list<param_block>::iterator iter1;
628                 list<lfta_info*>::iterator iter2 = lfta_list->begin();
629
630                 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
631                         lfta_info* inf = *iter2;
632
633                 #ifdef DEBUG
634                         fprintf(stderr,"Instantiate a FTA\n");
635                 #endif
636
637                         gs_uint32_t reuse_flag = 2;
638
639                         // we will try to create a new instance of child FTA only if it is parameterized
640                         if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
641                                 (*iter2)->f.streamid = 0;       // not interested in existing instances
642                                 reuse_flag = 0;
643                         }
644                         if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
645                                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
646                                 failed = true;
647                                 return;
648                         }
649
650                         free((*iter1).data);
651
652                         //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
653
654                         _fta.stream_subscribed[i]=(*iter2)->f;
655                 }
656                 _fta.stream_subscribed_cnt = i;
657
658                 num_eof_tuples = 0;
659
660                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
661                 _fta.free_fta = MULTOP_HFTA_free_fta;
662                 _fta.control_fta = MULTOP_HFTA_control_fta;
663                 _fta.accept_packet = MULTOP_HFTA_accept_packet;
664                 _fta.clock_fta = MULTOP_HFTA_clock_fta;
665
666                 // init runtime stats
667                 in_tuple_cnt = 0;
668                 out_tuple_cnt = 0;
669                 out_tuple_sz = 0;
670                 cycle_cnt = 0;
671
672         }
673
674         ~MULTOP_HFTA() {
675
676                 list<lfta_info*>::iterator iter;
677                 int i = 0;
678
679                 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
680                 delete *iter;
681                 }
682
683                 delete root;    // free operators memory
684                 delete lfta_list;
685
686
687      }
688
689         int flush() {
690
691                 list<host_tuple> res;
692
693                 // go through the list of operators in topological order
694                 // and flush them
695                 list<host_tuple>::iterator iter;
696                 list<host_tuple> temp_output_queue;
697
698                 for (int i = 0; i < num_operators; ++i) {
699                         operator_node* node = sorted_nodes[i];
700
701 #ifdef PLAN_DAG
702                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
703 #else
704                         // for trees we can put output tuples directly into parent's input buffer
705                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
706 #endif
707                         // consume tuples waiting in your queue
708                         for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
709                                 node->op->accept_tuple(*(iter), current_output_queue);
710                         }
711                         node->op->flush(current_output_queue);
712                         node->input_queue.clear();
713
714 #ifdef PLAN_DAG
715                         // copy the tuples from output queue into input queues of all parents
716                         list<operator_node*>::iterator node_iter;
717
718                         if (!node->parent_list.empty()) {
719                                 // append the content of the output queue to parent input queue
720
721                                 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
722                                         int* ref_cnt = 0;
723                                         if (node->parent_list.size() > 1) {
724                                                 ref_cnt = (int*)malloc(sizeof(int));
725                                                 *ref_cnt = node->parent_list.size() - 1;
726                                         }
727
728                                         for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
729                                                 (*iter).ref_cnt = ref_cnt;
730                                                 (*node_iter)->input_queue.push_back(*iter);
731                                         }
732                                 }
733                         }
734 #endif
735                 }
736
737                 if (!res.empty()) {
738                         // go through the list of returned tuples and finalyze them
739                         list<host_tuple>::iterator iter = res.begin();
740                         while (iter != res.end()) {
741                                 host_tuple& tup = *iter;
742
743                                 // finalize the tuple
744                                 if (tup.tuple_size)
745                                         finalize_tuple(tup);
746
747                                 output_queue_mem += tup.tuple_size;
748                                 iter++;
749                         }
750
751                         // append returned list to output_queue
752                         output_queue.splice(output_queue.end(), res);
753
754
755                         // post tuples
756                         while (!output_queue.empty()) {
757                                 host_tuple& tup = output_queue.front();
758                                 #ifdef DEBUG
759                                         fprintf(stderr, "HFTA::about to post tuple\n");
760                                 #endif
761                                 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
762                                         output_queue_mem -= tup.tuple_size;
763                                         tup.free_tuple();
764                                         output_queue.pop_front();
765                                 } else
766                                         break;
767                         }
768                 }
769
770                 return 0;
771         }
772
773         bool init_failed(){return failed;}
774 };
775
776
777 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
778         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor
779                                                                 // will be called on program exit
780
781         if (recursive) {
782                 // free instance we are subscribed to
783                 list<lfta_info*>::iterator iter;
784                 int i = 0;
785
786                 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
787                         fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
788                 }
789         }
790
791         delete ftap;
792         return 0;
793 }
794
795 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
796         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
797
798         if (command == FTA_COMMAND_FLUSH) {
799
800                 // ask lftas to do the flush
801                 list<lfta_info*>::iterator iter;
802                 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
803                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
804                 }
805                 // flush hfta operators
806                 ftap->flush();
807
808         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
809
810                 list<param_block> param_list;
811                 get_lfta_params(sz, value, param_list);
812
813                 // ask lftas to do the flush and set new parameters
814                 list<lfta_info*>::iterator iter;
815                 list<param_block>::iterator iter2;
816                 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
817                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
818                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
819                         free((*iter2).data);
820                 }
821                 // flush hfta operators
822                 ftap->flush();
823
824                 // set the new parameter block for every operator in tree
825                 for (int i = 0; i < ftap->num_operators; ++i)
826                         ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
827
828         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
829                 // we no longer use temp_status commands
830                 // hearbeat mechanism is used instead
831         }
832         return 0;
833 }
834
835 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
836         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
837
838         gs_uint64_t start_cycle = rdtsc();
839 #ifdef DEBUG
840         fprintf(stderr, "HFTA::accepted packet\n");
841 #endif
842         if (!length)     /* ignore null tuples */
843                 return 0;
844
845         ftap->in_tuple_cnt++;
846
847         host_tuple temp;
848         temp.tuple_size = length;
849         temp.data = packet;
850         temp.channel = 0;
851         temp.heap_resident = false;
852
853 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
854
855         // find from which lfta the tuple came
856         list<lfta_info*>::iterator iter;
857         lfta_info* inf = NULL;
858         int i;
859
860         for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
861                 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&
862                         ftap->_fta.stream_subscribed[i].port == ftaid->port &&
863                         ftap->_fta.stream_subscribed[i].index == ftaid->index &&
864                         ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
865                         inf = *iter;
866                         break;
867                 }
868         }
869
870         if (!inf) {
871                 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
872                 exit(1);
873         }
874
875         // route tuple through operator chain
876         list<host_tuple> result;
877         host_tuple tup;
878         int ret;
879         #ifndef PLAN_DAG
880                 temp.channel = inf->output_channel;
881         #endif
882         operator_node* current_node = NULL, *child = NULL;
883         list<host_tuple> temp_output_queue;
884
885
886         fta_stat* tup_trace = NULL;
887         gs_uint32_t tup_trace_sz = 0;
888         gs_uint64_t trace_id = 0;
889         bool temp_tuple_received = false;
890
891         // if the tuple is temporal we need to extract the heartbeat payload
892         if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
893                 temp_tuple_received = true;
894                 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
895                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
896         }
897
898         if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
899
900                 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
901                         return 0;
902
903                 ftap->num_eof_tuples = 0;
904
905                 /* perform a flush  */
906                 ftap->flush();
907
908                 /* post eof_tuple to a parent */
909                 host_tuple eof_tuple;
910                 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
911
912                 /* last byte of the tuple specify the tuple type
913                  * set it to EOF_TUPLE
914                 */
915                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
916                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
917                 ftap->out_tuple_cnt++;
918                 ftap->out_tuple_sz+=eof_tuple.tuple_size;
919
920                 return 0;
921         }
922
923         list<host_tuple>::iterator iter2;
924
925 #ifdef PLAN_DAG
926
927         // push tuple to all parent operators of the lfta
928         list<operator_node*>::iterator node_iter;
929         list<unsigned>::iterator chan_iter;
930         for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
931                 temp.channel = *chan_iter;
932                 (*node_iter)->input_queue.push_back(temp);
933         }
934
935         for (i = 0; i < ftap->num_operators; ++i) {
936
937                 operator_node* node = ftap->sorted_nodes[i];
938                 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
939
940                 // consume tuples waiting in your queue
941                 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
942                         node->op->accept_tuple(*(iter2), current_output_queue);
943                 }
944                 node->input_queue.clear();
945
946                 // copy the tuples from output queue into input queues of all parents
947
948                 if (!node->parent_list.empty()) {
949
950                         // append the content of the output queue to parent input queue
951                         for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
952
953                                 int* ref_cnt = 0;
954                                 if (node->parent_list.size() > 1) {
955                                         ref_cnt = (int*)malloc(sizeof(int));
956                                         *ref_cnt = node->parent_list.size() - 1;
957                                 }
958
959                                 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
960                                         (*iter2).ref_cnt = ref_cnt;
961                                         (*iter2).channel = *chan_iter;
962                                         (*node_iter)->input_queue.push_back(*iter2);
963                                 }
964                         }
965                 }
966                 temp_output_queue.clear();
967         }
968 #else
969         current_node = inf->parent;
970
971 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
972         current_node->input_queue.push_back(temp);
973
974         do {
975 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
976                 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
977
978                 // consume tuples waiting in your queue
979                 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
980                         current_node->op->accept_tuple((*iter2),current_output_queue);
981                 }
982 //                      All consumed, delete them
983                 current_node->input_queue.clear();
984                 current_node = current_node->parent;
985
986         } while (current_node);
987 #endif
988
989
990         host_tuple temp_tup;
991
992         bool no_temp_tuple = false;
993
994         // if we received temporal tuple, last tuple of the result must be temporal too
995         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
996         if (temp_tuple_received) {
997
998                 if (result.empty()) {
999                         no_temp_tuple = true;
1000
1001                 } else {
1002                         fta_stat stats;
1003                         temp_tup = result.back();
1004                         finalize_tuple(temp_tup);
1005
1006                         int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
1007                         char* new_data = (char*)malloc(new_tuple_size);
1008                         memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
1009                         memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
1010                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
1011
1012
1013                         memset((char*)&stats, 0, sizeof(fta_stat));
1014                         stats.ftaid = fta->ftaid;
1015                         stats.in_tuple_cnt = ftap->in_tuple_cnt;
1016                         stats.out_tuple_cnt = ftap->out_tuple_cnt;
1017                         stats.out_tuple_sz = ftap->out_tuple_sz;
1018                         stats.cycle_cnt = ftap->cycle_cnt;
1019                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
1020
1021                         // Send a hearbeat message to clearinghouse.
1022                         fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
1023
1024                         // reset the stats
1025                         ftap->in_tuple_cnt = 0;
1026                         ftap->out_tuple_cnt = 0;
1027                         ftap->out_tuple_sz = 0;
1028                         ftap->cycle_cnt = 0;
1029
1030                         free(temp_tup.data);
1031                         temp_tup.data = new_data;
1032                         temp_tup.tuple_size = new_tuple_size;
1033                         result.pop_back();
1034                 }
1035         }
1036
1037         // go through the list of returned tuples and finalyze them
1038         // since we can produce multiple temporal tuples in DAG plans
1039         // we can drop all of them except the last one
1040         iter2 = result.begin();
1041         while(iter2 != result.end()) {
1042                 host_tuple tup = *iter2;
1043
1044                 // finalize the tuple
1045                 if (tup.tuple_size) {
1046                         finalize_tuple(tup);
1047
1048         #ifdef PLAN_DAG
1049                 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1050                         tup.free_tuple();
1051                 else
1052         #endif
1053                         {
1054                         ftap->output_queue.push_back(tup);
1055                         ftap->output_queue_mem += tup.tuple_size;
1056                         }
1057
1058                 }
1059                 iter2++;
1060         }
1061
1062         // append returned list to output_queue
1063         // ftap->output_queue.splice(ftap->output_queue.end(), result);
1064
1065         if (temp_tuple_received && !no_temp_tuple) {
1066                 ftap->output_queue.push_back(temp_tup);
1067                 ftap->output_queue_mem += temp_tup.tuple_size;
1068         }
1069
1070         // post tuples
1071         while (!ftap->output_queue.empty()) {
1072                 host_tuple& tup = ftap->output_queue.front();
1073                 #ifdef DEBUG
1074                         fprintf(stderr, "HFTA::about to post tuple\n");
1075                 #endif
1076                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1077                         ftap->out_tuple_cnt++;
1078                         ftap->out_tuple_sz+=tup.tuple_size;
1079                         ftap->output_queue_mem -= tup.tuple_size;
1080                         tup.free_tuple();
1081                         ftap->output_queue.pop_front();
1082                 } else
1083                         break;
1084         }
1085
1086         ftap->cycle_cnt += rdtsc() - start_cycle;
1087
1088         return 1;
1089 }
1090
1091 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1092         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1093
1094 #ifdef HFTA_PROFILE
1095         /* Print stats */
1096         fprintf(stderr, "FTA = %s|", ftap->fta_name);
1097         fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1098         fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1099         fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1100         fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1101
1102
1103                 fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1104                 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1105
1106                 for (int i = 1; i < ftap->num_operators; ++i) {
1107                         operator_node* node = ftap->sorted_nodes[i];
1108                         fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1109                         total_mem += node->op->get_mem_footprint();
1110                 }
1111                 fprintf(stderr, ", total = %d|", total_mem );
1112                 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1113 #endif
1114
1115         fta_stat stats;
1116         memset((char*)&stats, 0, sizeof(fta_stat));
1117         stats.ftaid = fta->ftaid;
1118         stats.in_tuple_cnt = ftap->in_tuple_cnt;
1119         stats.out_tuple_cnt = ftap->out_tuple_cnt;
1120         stats.out_tuple_sz = ftap->out_tuple_sz;
1121         stats.cycle_cnt = ftap->cycle_cnt;
1122
1123         // Send a hearbeat message to clearinghouse.
1124         fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1125
1126         // resets runtime stats
1127         ftap->in_tuple_cnt = 0;
1128         ftap->out_tuple_cnt = 0;
1129         ftap->out_tuple_sz = 0;
1130         ftap->cycle_cnt = 0;
1131
1132         return 0;
1133 }
1134
1135
1136 #endif  // __HFTA_H
1137