535b3759155a1d369b32a03b129c8f50adc61d4c
[com/gs-lite.git] / include / hfta / hfta.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef __HFTA_H
17 #define __HFTA_H
18
19 #include "gstypes.h"
20 #include "host_tuple.h"
21 #include "base_operator.h"
22 #include <vector>
23 #include <map>
24 #include "rdtsc.h"
25 using namespace std;
26
27 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
28 #define LLMIN(x,y) ((x)<(y)?(x):(y))
29 #define LLMAX(x,y) ((x)<(y)?(y):(x))
30 #define UMIN(x,y) ((x)<(y)?(x):(y))
31 #define UMAX(x,y) ((x)<(y)?(y):(x))
32 #define EQ(x,y) ((x)==(y))
33 #define GEQ(x,y) ((x)>=(y))
34 #define LEQ(x,y) ((x)<=(y))
35 //      Cast away temporality
36 #define non_temporal(x)(x)
37
38
39
40 extern "C" {
41 #include <lapp.h>
42 #include <fta.h>
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <schemaparser.h>
46 }
47
48 struct param_block {
49         gs_int32_t block_length;
50         void* data;
51 };
52
53 // forward declaration of operator_node
54 struct operator_node;
55
56 struct lfta_info {
57         gs_schemahandle_t schema_handle;
58         gs_sp_t schema;
59         gs_sp_t fta_name;
60 #ifdef PLAN_DAG
61         list<operator_node*> parent_list;
62         list<unsigned> out_channel_list;
63 #else
64         operator_node* parent;
65         unsigned output_channel;
66 #endif
67         FTAID f;
68
69         lfta_info() {
70                 schema_handle = -1;
71                 schema = NULL;
72                 #ifndef PLAN_DAG
73                         parent = NULL;
74                         output_channel = 0;
75                 #endif
76         }
77
78         ~lfta_info() {
79                 if (fta_name)
80                         free (fta_name);
81                 if (schema)
82                         free (schema);
83                 if (schema_handle >= 0)
84                         ftaschema_free(schema_handle);
85         }
86 };
87
88
89 struct operator_node {
90         base_operator* op;
91
92 #ifdef PLAN_DAG
93         list<operator_node*> parent_list;
94         list<unsigned> out_channel_list;
95 #else
96         operator_node* parent;
97 #endif
98
99         operator_node* left_child;
100         operator_node* right_child;
101         lfta_info* left_lfta;
102         lfta_info* right_lfta;
103
104         list<host_tuple> input_queue;
105
106         operator_node(base_operator* o) {
107                 op = o;
108                 #ifndef PLAN_DAG
109                         parent = NULL;
110                 #endif
111                 left_child = right_child = NULL;
112                 left_lfta = right_lfta = NULL;
113         }
114
115         ~operator_node() {
116                 delete op;
117         }
118
119         void set_left_child_node(operator_node* child) {
120                 left_child = child;
121                 if (child) {
122                         #ifdef PLAN_DAG
123                                 child->parent_list.push_back(this);
124                                 child->out_channel_list.push_back(0);
125                         #else
126                                 child->parent = this;
127                                 child->op->set_output_channel(0);
128                         #endif
129                 }
130         }
131
132         void set_right_child_node(operator_node* child) {
133                 right_child = child;
134                 if (child) {
135                         #ifdef PLAN_DAG
136                                 child->parent_list.push_back(this);
137                                 child->out_channel_list.push_back(1);
138                         #else
139                                 child->parent = this;
140                                 child->op->set_output_channel(1);
141                         #endif
142                 }
143         }
144
145         void set_left_lfta(lfta_info* l_lfta) {
146                 left_lfta = l_lfta;
147                 if (left_lfta) {
148                         #ifdef PLAN_DAG
149                                 left_lfta->parent_list.push_back(this);
150                                 left_lfta->out_channel_list.push_back(0);
151                         #else
152                                 left_lfta->parent = this;
153                                 left_lfta->output_channel = 0;
154                         #endif
155                 }
156         }
157
158         void set_right_lfta(lfta_info* r_lfta) {
159                 right_lfta = r_lfta;
160                 if (right_lfta) {
161                         #ifdef PLAN_DAG
162                                 right_lfta->parent_list.push_back(this);
163                                 right_lfta->out_channel_list.push_back(1);
164                         #else
165                                 right_lfta->parent = this;
166                                 right_lfta->output_channel = 1;
167                         #endif
168                 }
169         }
170
171 };
172
173
174
175 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
176 void finalize_tuple(host_tuple &tup);
177 void finalize_tuple(host_tuple &tup);
178 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
179 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
180 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
181 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
182
183 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
184 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
185 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
186 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
187
188 struct UNOP_HFTA {
189         struct FTA _fta;
190         base_operator* oper;
191         FTAID f;
192         bool failed;
193         gs_schemahandle_t schema_handle;
194
195         list<host_tuple> output_queue;
196
197         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
198         // lfta_name and an instance of the base_operator
199         // We don't need to know the output schema as this information is already embeded
200         // in create_output_tuple method of operators' functor.
201         UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
202                 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
203
204                 failed = false;
205                 oper = op;
206                 f = child_ftaid;
207                 schema_handle = sch_handle;
208
209                 // assign streamid
210                 _fta.ftaid = ftaid;
211                 _fta.ftaid.streamid = (gs_p_t)this;
212
213 #ifdef DEBUG
214                 fprintf(stderr,"Instantiate a FTA\n");
215 #endif
216                 /* extract lfta param block from hfta param block */
217                 list<param_block> param_list;
218                 get_lfta_params(sz, value, param_list);
219                 param_block param = param_list.front();
220
221
222         gs_uint32_t reuse_flag = 2;
223         // we will try to create a new instance of child FTA only if it is parameterized
224         if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
225                 f.streamid = 0; // not interested in existing instances
226                 reuse_flag = 0;
227                 }
228
229                 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
230                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
231                         failed = true;
232                     return;
233             }
234
235                 free(param.data);
236                 // set the operator's parameters
237                 if(oper->set_param_block(sz, (void*)value)) failed = true;;
238
239
240                 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
241                 _fta.stream_subscribed_cnt = 1;
242                 _fta.stream_subscribed[0] = f;
243
244                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
245                 _fta.free_fta = UNOP_HFTA_free_fta;
246                 _fta.control_fta = UNOP_HFTA_control_fta;
247                 _fta.accept_packet = UNOP_HFTA_accept_packet;
248                 _fta.clock_fta = UNOP_HFTA_clock_fta;
249         }
250
251         ~UNOP_HFTA() {
252          delete oper;    // free operators memory
253
254      }
255
256      int flush() {
257                 list<host_tuple> res;
258                 if (!oper->flush(res)) {
259
260                         if (!res.empty()) {
261                                 // go through the list of returned tuples and finalyze them
262                                 list<host_tuple>::iterator iter = res.begin();
263                                 while (iter != res.end()) {
264                                         host_tuple& tup = *iter;
265
266                                         // finalize the tuple
267                                         if (tup.tuple_size)
268                                                 finalize_tuple(tup);
269                                         iter++;
270                                 }
271
272                                 // append returned list to output_queue
273                                 output_queue.splice(output_queue.end(), res);
274
275
276                                 // post tuples
277                                 while (!output_queue.empty()) {
278                                         host_tuple& tup = output_queue.front();
279                                         #ifdef DEBUG
280                                                 fprintf(stderr, "HFTA::about to post tuple\n");
281                                         #endif
282                                         if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
283                                                 tup.free_tuple();
284                                                 output_queue.pop_front();
285                                         } else
286                                                 break;
287                                 }
288                         }
289                 }
290
291                 return 0;
292          }
293
294         bool init_failed(){return failed;}
295 };
296
297 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
298         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor
299                                                                 // will be called on program exit
300
301         if (recursive)
302                 // free instance we are subscribed to
303                 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
304
305         delete ftap;
306         return 0;
307 }
308
309 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
310         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
311
312         if (command == FTA_COMMAND_FLUSH) {
313                 // ask lfta to do the flush
314                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
315                 ftap->flush();
316
317         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
318                 // ask lfta to do the flush
319                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
320                 ftap->flush();
321
322                 /* extract lfta param block from hfta param block */
323                 list<param_block> param_list;
324                 get_lfta_params(sz, value, param_list);
325                 param_block param = param_list.front();
326                 // load new parameters into lfta
327                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
328                 free(param.data);
329
330                 // notify the operator about the change of parameter
331                 ftap->oper->set_param_block(sz, (void*)value);
332
333         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
334                 // we no longer use temp_status commands
335                 // hearbeat mechanism is used instead
336         }
337         return 0;
338 }
339
340 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
341         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
342 #ifdef DEBUG
343         fprintf(stderr, "HFTA::accepted packet\n");
344 #endif
345         if (!length)     /* ignore null tuples */
346                 return 0;
347
348         host_tuple temp;
349         temp.tuple_size = length;
350         temp.data = packet;
351         temp.channel = 0;
352         temp.heap_resident = false;
353
354         // pass the tuple to operator
355         list<host_tuple> res;
356         int ret;
357         fta_stat* tup_trace = NULL;
358         gs_uint32_t tup_trace_sz = 0;
359         gs_uint64_t trace_id = 0;
360         bool temp_tuple_received = false;
361
362
363         // if the tuple is temporal we need to extract the hearbeat payload
364         if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
365                 temp_tuple_received = true;
366                 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
367                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
368         }
369
370         if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
371                 /* perform a flush  */
372                 ftap->flush();
373
374                 /* post eof_tuple to a parent */
375                 host_tuple eof_tuple;
376                 ftap->oper->get_temp_status(eof_tuple);
377
378                 /* last byte of the tuple specifies the tuple type
379                  * set it to EOF_TUPLE
380                 */
381                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
382                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
383
384                 return 0;
385         }
386
387         ret = ftap->oper->accept_tuple(temp, res);
388
389         // go through the list of returned tuples and finalyze them
390         list<host_tuple>::iterator iter = res.begin();
391         while (iter != res.end()) {
392                 host_tuple& tup = *iter;
393
394                 // finalize the tuple
395                 if (tup.tuple_size)
396                         finalize_tuple(tup);
397                 iter++;
398         }
399
400         // if we received temporal tuple, last tuple of the result must be temporal too
401         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
402         if (temp_tuple_received) {
403                 fta_stat stats;
404                 host_tuple& temp_tup = res.back();
405
406
407                 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
408                 char* new_data = (char*)malloc(new_tuple_size);
409                 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
410                 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
411                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
412                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
413
414                 memset((char*)&stats, 0, sizeof(fta_stat));
415                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
416
417                 // Send a hearbeat message to clearinghouse.
418                 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
419
420                 temp_tup.free_tuple();
421                 temp_tup.data = new_data;
422                 temp_tup.tuple_size = new_tuple_size;
423         }
424
425         // append returned list to output_queue
426         ftap->output_queue.splice(ftap->output_queue.end(), res);
427
428         // post tuples
429         while (!ftap->output_queue.empty()) {
430                 host_tuple& tup = ftap->output_queue.front();
431                 #ifdef DEBUG
432                         fprintf(stderr, "HFTA::about to post tuple\n");
433                 #endif
434                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
435                         tup.free_tuple();
436                         ftap->output_queue.pop_front();
437                 } else
438                         break;
439         }
440
441         return 1;
442 }
443
444 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
445
446         // Send a hearbeat message to clearinghouse.to indicate we are alive
447         fta_heartbeat(ftap->ftaid, 0, 0, 0);
448
449         return 0;
450 }
451
452
453 struct MULTOP_HFTA {
454         struct FTA _fta;
455         gs_csp_t fta_name;
456         gs_schemahandle_t schema_handle;
457         operator_node* root;
458         vector<operator_node*> sorted_nodes;
459         int num_operators;
460         list<lfta_info*> *lfta_list;
461         /* number of eof tuples we received so far
462          * receiving eof tuples from every source fta will cause a flush
463         */
464         int num_eof_tuples;
465
466         bool failed;
467         bool reusable;
468
469         list<host_tuple> output_queue;
470
471         // Runtime stats
472         gs_uint32_t in_tuple_cnt;
473         gs_uint32_t out_tuple_cnt;
474         gs_uint32_t out_tuple_sz;
475         gs_uint64_t cycle_cnt;
476
477         gs_uint64_t trace_id;
478
479         // memory occupied by output queue
480         gs_uint32_t output_queue_mem;
481
482
483         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
484         // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
485         // as the schema handle is already passed during operator creation time.
486         // We also don't need to know the output schema as this information is already embeded
487         // in create_output_tuple method of operators' functor.
488
489
490
491         MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
492                 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
493
494                 fta_name = name;
495                 failed = false;
496
497                 root = node;
498                 lfta_list = lftas;
499
500                 // assign streamid
501                 _fta.ftaid = ftaid;
502                 _fta.ftaid.streamid = (gs_p_t)this;
503
504                 schema_handle = sch_handle;
505
506                 output_queue_mem = 0;
507
508                 // topologically sort the operators in the tree (or DAG)
509                 // for DAG we make sure we add the node to the sorted list only once
510                 operator_node* current_node;
511                 map<operator_node*, int> node_map;
512                 vector<operator_node*> node_list;
513
514                 int i = 0;
515                 node_list.push_back(root);
516                 node_map[root] = 0;
517
518                 num_operators = 1;
519
520                 while (i < node_list.size()) {
521                         current_node = node_list[i];
522                         if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
523                                 node_map[current_node->left_child] = num_operators++;
524                                 node_list.push_back(current_node->left_child);
525                         }
526                         if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
527                                 node_map[current_node->right_child] = num_operators++;
528                                 node_list.push_back(current_node->right_child);
529                         }
530                         i++;
531                 }
532                 num_operators = i;
533
534                 // build adjacency lists for query DAG
535                 list<int>* adj_lists = new list<int>[num_operators];
536                 bool* leaf_flags = new bool[num_operators];
537                 memset(leaf_flags, 0, num_operators * sizeof(bool));
538                 for (i = 0; i < num_operators; ++i) {
539                         current_node = node_list[i];
540                         if (current_node->left_child) {
541                                 adj_lists[i].push_back(node_map[current_node->left_child]);
542                         }
543                         if (current_node->right_child && current_node->left_child != current_node->right_child) {
544                                 adj_lists[i].push_back(node_map[current_node->right_child]);
545                         }
546                 }
547
548                 // run topolofical sort
549                 bool leaf_found = true;
550                 while (leaf_found) {
551                         leaf_found = false;
552                         // add all leafs to sorted_nodes
553                         for (i = 0; i < num_operators; ++i) {
554                                 if (!leaf_flags[i] && adj_lists[i].empty()) {
555                                         leaf_flags[i] = true;
556                                         sorted_nodes.push_back(node_list[i]);
557                                         leaf_found = true;
558
559                                         // remove the node from its parents adjecency lists
560                                         for (int j = 0; j < num_operators; ++j) {
561                                                 list<int>::iterator iter;
562                                                 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
563                                                         if (*iter == i) {
564                                                                 adj_lists[j].erase(iter);
565                                                                 break;
566                                                         }
567                                                 }
568                                         }
569                                 }
570                         }
571                 }
572
573                 delete[] adj_lists;
574                 delete[] leaf_flags;
575
576                 // set the parameter block for every operator in tree
577                 for (i = 0; i < num_operators; ++i)
578                         if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
579
580 #ifdef DEBUG
581                 fprintf(stderr,"Instantiate FTAs\n");
582 #endif
583                 /* extract lfta param block from hfta param block */
584 //                      NOTE: param_list must line up with lfta_list
585                 list<param_block> param_list;
586                 get_lfta_params(sz, value, param_list);
587                 list<param_block>::iterator iter1;
588                 list<lfta_info*>::iterator iter2 = lfta_list->begin();
589
590                 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
591                         lfta_info* inf = *iter2;
592
593                 #ifdef DEBUG
594                         fprintf(stderr,"Instantiate a FTA\n");
595                 #endif
596
597                         gs_uint32_t reuse_flag = 2;
598
599                         // we will try to create a new instance of child FTA only if it is parameterized
600                         if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
601                                 (*iter2)->f.streamid = 0;       // not interested in existing instances
602                                 reuse_flag = 0;
603                         }
604                         if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
605                                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
606                                 failed = true;
607                                 return;
608                         }
609
610                         free((*iter1).data);
611
612                         //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
613
614                         _fta.stream_subscribed[i]=(*iter2)->f;
615                 }
616                 _fta.stream_subscribed_cnt = i;
617
618                 num_eof_tuples = 0;
619
620                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
621                 _fta.free_fta = MULTOP_HFTA_free_fta;
622                 _fta.control_fta = MULTOP_HFTA_control_fta;
623                 _fta.accept_packet = MULTOP_HFTA_accept_packet;
624                 _fta.clock_fta = MULTOP_HFTA_clock_fta;
625
626                 // init runtime stats
627                 in_tuple_cnt = 0;
628                 out_tuple_cnt = 0;
629                 out_tuple_sz = 0;
630                 cycle_cnt = 0;
631
632         }
633
634         ~MULTOP_HFTA() {
635
636                 list<lfta_info*>::iterator iter;
637                 int i = 0;
638
639                 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
640                 delete *iter;
641                 }
642
643                 delete root;    // free operators memory
644                 delete lfta_list;
645
646
647      }
648
649         int flush() {
650
651                 list<host_tuple> res;
652
653                 // go through the list of operators in topological order
654                 // and flush them
655                 list<host_tuple>::iterator iter;
656                 list<host_tuple> temp_output_queue;
657
658                 for (int i = 0; i < num_operators; ++i) {
659                         operator_node* node = sorted_nodes[i];
660
661 #ifdef PLAN_DAG
662                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
663 #else
664                         // for trees we can put output tuples directly into parent's input buffer
665                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
666 #endif
667                         // consume tuples waiting in your queue
668                         for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
669                                 node->op->accept_tuple(*(iter), current_output_queue);
670                         }
671                         node->op->flush(current_output_queue);
672                         node->input_queue.clear();
673
674 #ifdef PLAN_DAG
675                         // copy the tuples from output queue into input queues of all parents
676                         list<operator_node*>::iterator node_iter;
677
678                         if (!node->parent_list.empty()) {
679                                 // append the content of the output queue to parent input queue
680
681                                 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
682                                         int* ref_cnt = 0;
683                                         if (node->parent_list.size() > 1) {
684                                                 ref_cnt = (int*)malloc(sizeof(int));
685                                                 *ref_cnt = node->parent_list.size() - 1;
686                                         }
687
688                                         for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
689                                                 (*iter).ref_cnt = ref_cnt;
690                                                 (*node_iter)->input_queue.push_back(*iter);
691                                         }
692                                 }
693                         }
694 #endif
695                 }
696
697                 if (!res.empty()) {
698                         // go through the list of returned tuples and finalyze them
699                         list<host_tuple>::iterator iter = res.begin();
700                         while (iter != res.end()) {
701                                 host_tuple& tup = *iter;
702
703                                 // finalize the tuple
704                                 if (tup.tuple_size)
705                                         finalize_tuple(tup);
706
707                                 output_queue_mem += tup.tuple_size;
708                                 iter++;
709                         }
710
711                         // append returned list to output_queue
712                         output_queue.splice(output_queue.end(), res);
713
714
715                         // post tuples
716                         while (!output_queue.empty()) {
717                                 host_tuple& tup = output_queue.front();
718                                 #ifdef DEBUG
719                                         fprintf(stderr, "HFTA::about to post tuple\n");
720                                 #endif
721                                 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
722                                         output_queue_mem -= tup.tuple_size;
723                                         tup.free_tuple();
724                                         output_queue.pop_front();
725                                 } else
726                                         break;
727                         }
728                 }
729
730                 return 0;
731         }
732
733         bool init_failed(){return failed;}
734 };
735
736
737 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
738         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor
739                                                                 // will be called on program exit
740
741         if (recursive) {
742                 // free instance we are subscribed to
743                 list<lfta_info*>::iterator iter;
744                 int i = 0;
745
746                 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
747                         fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
748                 }
749         }
750
751         delete ftap;
752         return 0;
753 }
754
755 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
756         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
757
758         if (command == FTA_COMMAND_FLUSH) {
759
760                 // ask lftas to do the flush
761                 list<lfta_info*>::iterator iter;
762                 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
763                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
764                 }
765                 // flush hfta operators
766                 ftap->flush();
767
768         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
769
770                 list<param_block> param_list;
771                 get_lfta_params(sz, value, param_list);
772
773                 // ask lftas to do the flush and set new parameters
774                 list<lfta_info*>::iterator iter;
775                 list<param_block>::iterator iter2;
776                 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
777                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
778                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
779                         free((*iter2).data);
780                 }
781                 // flush hfta operators
782                 ftap->flush();
783
784                 // set the new parameter block for every operator in tree
785                 for (int i = 0; i < ftap->num_operators; ++i)
786                         ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
787
788         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
789                 // we no longer use temp_status commands
790                 // hearbeat mechanism is used instead
791         }
792         return 0;
793 }
794
795 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
796         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
797
798         gs_uint64_t start_cycle = rdtsc();
799 #ifdef DEBUG
800         fprintf(stderr, "HFTA::accepted packet\n");
801 #endif
802         if (!length)     /* ignore null tuples */
803                 return 0;
804
805         ftap->in_tuple_cnt++;
806
807         host_tuple temp;
808         temp.tuple_size = length;
809         temp.data = packet;
810         temp.channel = 0;
811         temp.heap_resident = false;
812
813 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
814
815         // find from which lfta the tuple came
816         list<lfta_info*>::iterator iter;
817         lfta_info* inf = NULL;
818         int i;
819
820         for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
821                 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&
822                         ftap->_fta.stream_subscribed[i].port == ftaid->port &&
823                         ftap->_fta.stream_subscribed[i].index == ftaid->index &&
824                         ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
825                         inf = *iter;
826                         break;
827                 }
828         }
829
830         if (!inf) {
831                 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
832                 exit(1);
833         }
834
835         // route tuple through operator chain
836         list<host_tuple> result;
837         host_tuple tup;
838         int ret;
839         #ifndef PLAN_DAG
840                 temp.channel = inf->output_channel;
841         #endif
842         operator_node* current_node = NULL, *child = NULL;
843         list<host_tuple> temp_output_queue;
844
845
846         fta_stat* tup_trace = NULL;
847         gs_uint32_t tup_trace_sz = 0;
848         gs_uint64_t trace_id = 0;
849         bool temp_tuple_received = false;
850
851         // if the tuple is temporal we need to extract the heartbeat payload
852         if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
853                 temp_tuple_received = true;
854                 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
855                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
856         }
857
858         if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
859
860                 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
861                         return 0;
862
863                 ftap->num_eof_tuples = 0;
864
865                 /* perform a flush  */
866                 ftap->flush();
867
868                 /* post eof_tuple to a parent */
869                 host_tuple eof_tuple;
870                 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
871
872                 /* last byte of the tuple specify the tuple type
873                  * set it to EOF_TUPLE
874                 */
875                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
876                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
877                 ftap->out_tuple_cnt++;
878                 ftap->out_tuple_sz+=eof_tuple.tuple_size;
879
880                 return 0;
881         }
882
883         list<host_tuple>::iterator iter2;
884
885 #ifdef PLAN_DAG
886
887         // push tuple to all parent operators of the lfta
888         list<operator_node*>::iterator node_iter;
889         list<unsigned>::iterator chan_iter;
890         for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
891                 temp.channel = *chan_iter;
892                 (*node_iter)->input_queue.push_back(temp);
893         }
894
895         for (i = 0; i < ftap->num_operators; ++i) {
896
897                 operator_node* node = ftap->sorted_nodes[i];
898                 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
899
900                 // consume tuples waiting in your queue
901                 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
902                         node->op->accept_tuple(*(iter2), current_output_queue);
903                 }
904                 node->input_queue.clear();
905
906                 // copy the tuples from output queue into input queues of all parents
907
908                 if (!node->parent_list.empty()) {
909
910                         // append the content of the output queue to parent input queue
911                         for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
912
913                                 int* ref_cnt = 0;
914                                 if (node->parent_list.size() > 1) {
915                                         ref_cnt = (int*)malloc(sizeof(int));
916                                         *ref_cnt = node->parent_list.size() - 1;
917                                 }
918
919                                 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
920                                         (*iter2).ref_cnt = ref_cnt;
921                                         (*iter2).channel = *chan_iter;
922                                         (*node_iter)->input_queue.push_back(*iter2);
923                                 }
924                         }
925                 }
926                 temp_output_queue.clear();
927         }
928 #else
929         current_node = inf->parent;
930
931 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
932         current_node->input_queue.push_back(temp);
933
934         do {
935 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
936                 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
937
938                 // consume tuples waiting in your queue
939                 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
940                         current_node->op->accept_tuple((*iter2),current_output_queue);
941                 }
942 //                      All consumed, delete them
943                 current_node->input_queue.clear();
944                 current_node = current_node->parent;
945
946         } while (current_node);
947 #endif
948
949
950         host_tuple temp_tup;
951
952         bool no_temp_tuple = false;
953
954         // if we received temporal tuple, last tuple of the result must be temporal too
955         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
956         if (temp_tuple_received) {
957
958                 if (result.empty()) {
959                         no_temp_tuple = true;
960
961                 } else {
962                         fta_stat stats;
963                         temp_tup = result.back();
964                         finalize_tuple(temp_tup);
965
966                         int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
967                         char* new_data = (char*)malloc(new_tuple_size);
968                         memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
969                         memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
970                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
971
972
973                         memset((char*)&stats, 0, sizeof(fta_stat));
974                         stats.ftaid = fta->ftaid;
975                         stats.in_tuple_cnt = ftap->in_tuple_cnt;
976                         stats.out_tuple_cnt = ftap->out_tuple_cnt;
977                         stats.out_tuple_sz = ftap->out_tuple_sz;
978                         stats.cycle_cnt = ftap->cycle_cnt;
979                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
980
981                         // Send a hearbeat message to clearinghouse.
982                         fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
983
984                         // reset the stats
985                         ftap->in_tuple_cnt = 0;
986                         ftap->out_tuple_cnt = 0;
987                         ftap->out_tuple_sz = 0;
988                         ftap->cycle_cnt = 0;
989
990                         free(temp_tup.data);
991                         temp_tup.data = new_data;
992                         temp_tup.tuple_size = new_tuple_size;
993                         result.pop_back();
994                 }
995         }
996
997         // go through the list of returned tuples and finalyze them
998         // since we can produce multiple temporal tuples in DAG plans
999         // we can drop all of them except the last one
1000         iter2 = result.begin();
1001         while(iter2 != result.end()) {
1002                 host_tuple tup = *iter2;
1003
1004                 // finalize the tuple
1005                 if (tup.tuple_size) {
1006                         finalize_tuple(tup);
1007
1008         #ifdef PLAN_DAG
1009                 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1010                         tup.free_tuple();
1011                 else
1012         #endif
1013                         {
1014                         ftap->output_queue.push_back(tup);
1015                         ftap->output_queue_mem += tup.tuple_size;
1016                         }
1017
1018                 }
1019                 iter2++;
1020         }
1021
1022         // append returned list to output_queue
1023         // ftap->output_queue.splice(ftap->output_queue.end(), result);
1024
1025         if (temp_tuple_received && !no_temp_tuple) {
1026                 ftap->output_queue.push_back(temp_tup);
1027                 ftap->output_queue_mem += temp_tup.tuple_size;
1028         }
1029
1030         // post tuples
1031         while (!ftap->output_queue.empty()) {
1032                 host_tuple& tup = ftap->output_queue.front();
1033                 #ifdef DEBUG
1034                         fprintf(stderr, "HFTA::about to post tuple\n");
1035                 #endif
1036                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1037                         ftap->out_tuple_cnt++;
1038                         ftap->out_tuple_sz+=tup.tuple_size;
1039                         ftap->output_queue_mem -= tup.tuple_size;
1040                         tup.free_tuple();
1041                         ftap->output_queue.pop_front();
1042                 } else
1043                         break;
1044         }
1045
1046         ftap->cycle_cnt += rdtsc() - start_cycle;
1047
1048         return 1;
1049 }
1050
1051 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1052         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1053
1054 #ifdef HFTA_PROFILE
1055         /* Print stats */
1056         fprintf(stderr, "FTA = %s|", ftap->fta_name);
1057         fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1058         fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1059         fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1060         fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1061
1062
1063                 fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1064                 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1065
1066                 for (int i = 1; i < ftap->num_operators; ++i) {
1067                         operator_node* node = ftap->sorted_nodes[i];
1068                         fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1069                         total_mem += node->op->get_mem_footprint();
1070                 }
1071                 fprintf(stderr, ", total = %d|", total_mem );
1072                 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1073 #endif
1074
1075         fta_stat stats;
1076         memset((char*)&stats, 0, sizeof(fta_stat));
1077         stats.ftaid = fta->ftaid;
1078         stats.in_tuple_cnt = ftap->in_tuple_cnt;
1079         stats.out_tuple_cnt = ftap->out_tuple_cnt;
1080         stats.out_tuple_sz = ftap->out_tuple_sz;
1081         stats.cycle_cnt = ftap->cycle_cnt;
1082
1083         // Send a hearbeat message to clearinghouse.
1084         fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1085
1086         // resets runtime stats
1087         ftap->in_tuple_cnt = 0;
1088         ftap->out_tuple_cnt = 0;
1089         ftap->out_tuple_sz = 0;
1090         ftap->cycle_cnt = 0;
1091
1092         return 0;
1093 }
1094
1095
1096 #endif  // __HFTA_H
1097