d3f5db4c5e87ba4237a1a880b6fd4bd1868f6e7c
[com/gs-lite.git] / include / hfta / hfta.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef __HFTA_H
17 #define __HFTA_H
18
19 #include "gstypes.h"
20 #include "host_tuple.h"
21 #include "base_operator.h"
22 #include <vector>
23 #include <map>
24 #include<math.h>
25 #include "rdtsc.h"
26 using namespace std;
27
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
29
30 // min, max
31 #define ULLMIN(x,y) (unsigned long long)(((x)<(y)?(x):(y)))
32 #define ULLMAX(x,y) (unsigned long long)(((x)<(y)?(y):(x)))
33 #define LLMIN(x,y) (long long int)(((x)<(y)?(x):(y)))
34 #define LLMAX(x,y) (long long int)(((x)<(y)?(y):(x)))
35 #define UMIN(x,y) (unsigned int)(((x)<(y)?(x):(y)))
36 #define UMAX(x,y) (unsigned int)(((x)<(y)?(y):(x)))
37 #define LMIN(x,y) (int)(((x)<(y)?(x):(y)))
38 #define LMAX(x,y) (int)(((x)<(y)?(y):(x)))
39 #define FMIN(x,y) (double)(((x)<(y)?(x):(y)))
40 #define FMAX(x,y) (double)(((x)<(y)?(y):(x)))
41
42 // comparison
43 #define EQ(x,y) ((x)==(y))
44 #define GEQ(x,y) ((x)>=(y))
45 #define GE(x,y) ((x)>(y))
46 #define LEQ(x,y) ((x)<=(y))
47 #define LE(x,y) ((x)<(y))
48
49 // if_else
50 #define if_else_f(x,y,z) (double)(((x)==0?(z):(y)))
51 #define if_else_ll(x,y,z) (long long int)(((x)==0?(z):(y)))
52 #define if_else_ul(x,y,z) (unsigned long long)(((x)==0?(z):(y)))
53 #define if_else_u(x,y,z) (unsigned int)(((x)==0?(z):(y)))
54 #define if_else_i(x,y,z) (int)(((x)==0?(z):(y)))
55
56 //      Cast away temporality
57 #define non_temporal(x)(x)
58
59 //      endian swap
60 #define endian_swap_ui(x) ( (( (x) & 0xFF000000) >> 24) | (( (x) & 0x00FF0000) >> 8) | (( (x) & 0x0000FF00) << 8) | (( (x) & 0x000000FF) << 24) ) 
61
62 //      Access math libraries
63 #define sqrt(x) sqrt(x)
64 #define pow(x,y) pow((x),(y))
65 #define sin(x) sin(x)
66 #define cos(x) cos(x)
67 #define tan(x) tan(x)
68 #define asin(x) asin(x)
69 #define acos(x) acos(x)
70 #define atan(x) atan(x)
71 #define log(x) log(x)
72 #define log2(x) log2(x)
73 #define log10(x) log10(x)
74 #define ceil(x) ceil(x)
75 #define floor(x) floor(x)
76 #define fmod(x) fmod(x)
77 #define trunc(x) trunc(x)
78
79
80 extern "C" {
81 #include <lapp.h>
82 #include <fta.h>
83 #include <stdlib.h>
84 #include <stdio.h>
85 #include <schemaparser.h>
86 }
87
88 struct param_block {
89         gs_int32_t block_length;
90         void* data;
91 };
92
93 // forward declaration of operator_node
94 struct operator_node;
95
96 struct lfta_info {
97         gs_schemahandle_t schema_handle;
98         gs_sp_t schema;
99         gs_sp_t fta_name;
100 #ifdef PLAN_DAG
101         list<operator_node*> parent_list;
102         list<unsigned> out_channel_list;
103 #else
104         operator_node* parent;
105         unsigned output_channel;
106 #endif
107         FTAID f;
108
109         lfta_info() {
110                 schema_handle = -1;
111                 schema = NULL;
112                 #ifndef PLAN_DAG
113                         parent = NULL;
114                         output_channel = 0;
115                 #endif
116         }
117
118         ~lfta_info() {
119                 if (fta_name)
120                         free (fta_name);
121                 if (schema)
122                         free (schema);
123                 if (schema_handle >= 0)
124                         ftaschema_free(schema_handle);
125         }
126 };
127
128
129 struct operator_node {
130         base_operator* op;
131
132 #ifdef PLAN_DAG
133         list<operator_node*> parent_list;
134         list<unsigned> out_channel_list;
135 #else
136         operator_node* parent;
137 #endif
138
139         operator_node* left_child;
140         operator_node* right_child;
141         lfta_info* left_lfta;
142         lfta_info* right_lfta;
143
144         list<host_tuple> input_queue;
145
146         operator_node(base_operator* o) {
147                 op = o;
148                 #ifndef PLAN_DAG
149                         parent = NULL;
150                 #endif
151                 left_child = right_child = NULL;
152                 left_lfta = right_lfta = NULL;
153         }
154
155         ~operator_node() {
156                 delete op;
157         }
158
159         void set_left_child_node(operator_node* child) {
160                 left_child = child;
161                 if (child) {
162                         #ifdef PLAN_DAG
163                                 child->parent_list.push_back(this);
164                                 child->out_channel_list.push_back(0);
165                         #else
166                                 child->parent = this;
167                                 child->op->set_output_channel(0);
168                         #endif
169                 }
170         }
171
172         void set_right_child_node(operator_node* child) {
173                 right_child = child;
174                 if (child) {
175                         #ifdef PLAN_DAG
176                                 child->parent_list.push_back(this);
177                                 child->out_channel_list.push_back(1);
178                         #else
179                                 child->parent = this;
180                                 child->op->set_output_channel(1);
181                         #endif
182                 }
183         }
184
185         void set_left_lfta(lfta_info* l_lfta) {
186                 left_lfta = l_lfta;
187                 if (left_lfta) {
188                         #ifdef PLAN_DAG
189                                 left_lfta->parent_list.push_back(this);
190                                 left_lfta->out_channel_list.push_back(0);
191                         #else
192                                 left_lfta->parent = this;
193                                 left_lfta->output_channel = 0;
194                         #endif
195                 }
196         }
197
198         void set_right_lfta(lfta_info* r_lfta) {
199                 right_lfta = r_lfta;
200                 if (right_lfta) {
201                         #ifdef PLAN_DAG
202                                 right_lfta->parent_list.push_back(this);
203                                 right_lfta->out_channel_list.push_back(1);
204                         #else
205                                 right_lfta->parent = this;
206                                 right_lfta->output_channel = 1;
207                         #endif
208                 }
209         }
210
211 };
212
213
214
215 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
216 void finalize_tuple(host_tuple &tup);
217 void finalize_tuple(host_tuple &tup);
218 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
219 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
220 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
221 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
222
223 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
224 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
225 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
226 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
227
228 struct UNOP_HFTA {
229         struct FTA _fta;
230         base_operator* oper;
231         FTAID f;
232         bool failed;
233         gs_schemahandle_t schema_handle;
234
235         list<host_tuple> output_queue;
236
237         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
238         // lfta_name and an instance of the base_operator
239         // We don't need to know the output schema as this information is already embeded
240         // in create_output_tuple method of operators' functor.
241         UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
242                 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
243
244                 failed = false;
245                 oper = op;
246                 f = child_ftaid;
247                 schema_handle = sch_handle;
248
249                 // assign streamid
250                 _fta.ftaid = ftaid;
251                 _fta.ftaid.streamid = (gs_p_t)this;
252
253 #ifdef DEBUG
254                 fprintf(stderr,"Instantiate a FTA\n");
255 #endif
256                 /* extract lfta param block from hfta param block */
257                 list<param_block> param_list;
258                 get_lfta_params(sz, value, param_list);
259                 param_block param = param_list.front();
260
261
262         gs_uint32_t reuse_flag = 2;
263         // we will try to create a new instance of child FTA only if it is parameterized
264         if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
265                 f.streamid = 0; // not interested in existing instances
266                 reuse_flag = 0;
267                 }
268
269                 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
270                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
271                         failed = true;
272                     return;
273             }
274
275                 free(param.data);
276                 // set the operator's parameters
277                 if(oper->set_param_block(sz, (void*)value)) failed = true;;
278
279
280                 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
281                 _fta.stream_subscribed_cnt = 1;
282                 _fta.stream_subscribed[0] = f;
283
284                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
285                 _fta.free_fta = UNOP_HFTA_free_fta;
286                 _fta.control_fta = UNOP_HFTA_control_fta;
287                 _fta.accept_packet = UNOP_HFTA_accept_packet;
288                 _fta.clock_fta = UNOP_HFTA_clock_fta;
289         }
290
291         ~UNOP_HFTA() {
292          delete oper;    // free operators memory
293
294      }
295
296      int flush() {
297                 list<host_tuple> res;
298                 if (!oper->flush(res)) {
299
300                         if (!res.empty()) {
301                                 // go through the list of returned tuples and finalyze them
302                                 list<host_tuple>::iterator iter = res.begin();
303                                 while (iter != res.end()) {
304                                         host_tuple& tup = *iter;
305
306                                         // finalize the tuple
307                                         if (tup.tuple_size)
308                                                 finalize_tuple(tup);
309                                         iter++;
310                                 }
311
312                                 // append returned list to output_queue
313                                 output_queue.splice(output_queue.end(), res);
314
315
316                                 // post tuples
317                                 while (!output_queue.empty()) {
318                                         host_tuple& tup = output_queue.front();
319                                         #ifdef DEBUG
320                                                 fprintf(stderr, "HFTA::about to post tuple\n");
321                                         #endif
322                                         if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
323                                                 tup.free_tuple();
324                                                 output_queue.pop_front();
325                                         } else
326                                                 break;
327                                 }
328                         }
329                 }
330
331                 return 0;
332          }
333
334         bool init_failed(){return failed;}
335 };
336
337 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
338         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor
339                                                                 // will be called on program exit
340
341         if (recursive)
342                 // free instance we are subscribed to
343                 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
344
345         delete ftap;
346         return 0;
347 }
348
349 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
350         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
351
352         if (command == FTA_COMMAND_FLUSH) {
353                 // ask lfta to do the flush
354                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
355                 ftap->flush();
356
357         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
358                 // ask lfta to do the flush
359                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
360                 ftap->flush();
361
362                 /* extract lfta param block from hfta param block */
363                 list<param_block> param_list;
364                 get_lfta_params(sz, value, param_list);
365                 param_block param = param_list.front();
366                 // load new parameters into lfta
367                 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
368                 free(param.data);
369
370                 // notify the operator about the change of parameter
371                 ftap->oper->set_param_block(sz, (void*)value);
372
373         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
374                 // we no longer use temp_status commands
375                 // hearbeat mechanism is used instead
376         }
377         return 0;
378 }
379
380 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
381         UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
382 #ifdef DEBUG
383         fprintf(stderr, "HFTA::accepted packet\n");
384 #endif
385         if (!length)     /* ignore null tuples */
386                 return 0;
387
388         host_tuple temp;
389         temp.tuple_size = length;
390         temp.data = packet;
391         temp.channel = 0;
392         temp.heap_resident = false;
393
394         // pass the tuple to operator
395         list<host_tuple> res;
396         int ret;
397         fta_stat* tup_trace = NULL;
398         gs_uint32_t tup_trace_sz = 0;
399         gs_uint64_t trace_id = 0;
400         bool temp_tuple_received = false;
401
402
403         // if the tuple is temporal we need to extract the hearbeat payload
404         if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
405                 temp_tuple_received = true;
406                 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
407                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
408         }
409
410         if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
411                 /* perform a flush  */
412                 ftap->flush();
413
414                 /* post eof_tuple to a parent */
415                 host_tuple eof_tuple;
416                 ftap->oper->get_temp_status(eof_tuple);
417
418                 /* last byte of the tuple specifies the tuple type
419                  * set it to EOF_TUPLE
420                 */
421                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
422                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
423
424                 return 0;
425         }
426
427         ret = ftap->oper->accept_tuple(temp, res);
428
429         // go through the list of returned tuples and finalyze them
430         list<host_tuple>::iterator iter = res.begin();
431         while (iter != res.end()) {
432                 host_tuple& tup = *iter;
433
434                 // finalize the tuple
435                 if (tup.tuple_size)
436                         finalize_tuple(tup);
437                 iter++;
438         }
439
440         // if we received temporal tuple, last tuple of the result must be temporal too
441         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
442         if (temp_tuple_received) {
443                 fta_stat stats;
444                 host_tuple& temp_tup = res.back();
445
446
447                 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
448                 char* new_data = (char*)malloc(new_tuple_size);
449                 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
450                 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
451                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
452                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
453
454                 memset((char*)&stats, 0, sizeof(fta_stat));
455                 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
456
457                 // Send a hearbeat message to clearinghouse.
458                 // Disable sending heartbeats for now to avoid overloading clearinghouse
459                 // fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
460
461                 temp_tup.free_tuple();
462                 temp_tup.data = new_data;
463                 temp_tup.tuple_size = new_tuple_size;
464         }
465
466         // append returned list to output_queue
467         ftap->output_queue.splice(ftap->output_queue.end(), res);
468
469         // post tuples
470         while (!ftap->output_queue.empty()) {
471                 host_tuple& tup = ftap->output_queue.front();
472                 #ifdef DEBUG
473                         fprintf(stderr, "HFTA::about to post tuple\n");
474                 #endif
475                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
476                         tup.free_tuple();
477                         ftap->output_queue.pop_front();
478                 } else
479                         break;
480         }
481
482         return 1;
483 }
484
485 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
486
487         // Send a hearbeat message to clearinghouse.to indicate we are alive
488         // Disable sending heartbeats for now to avoid overloading clearinghouse
489         // fta_heartbeat(ftap->ftaid, 0, 0, 0);
490
491         return 0;
492 }
493
494
495 struct MULTOP_HFTA {
496         struct FTA _fta;
497         gs_csp_t fta_name;
498         gs_schemahandle_t schema_handle;
499         operator_node* root;
500         vector<operator_node*> sorted_nodes;
501         int num_operators;
502         list<lfta_info*> *lfta_list;
503         /* number of eof tuples we received so far
504          * receiving eof tuples from every source fta will cause a flush
505         */
506         int num_eof_tuples;
507
508         bool failed;
509         bool reusable;
510
511         list<host_tuple> output_queue;
512
513         // Runtime stats
514         gs_uint32_t in_tuple_cnt;
515         gs_uint32_t out_tuple_cnt;
516         gs_uint32_t out_tuple_sz;
517         gs_uint64_t cycle_cnt;
518
519         gs_uint64_t trace_id;
520
521         // memory occupied by output queue
522         gs_uint32_t output_queue_mem;
523
524
525         // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
526         // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
527         // as the schema handle is already passed during operator creation time.
528         // We also don't need to know the output schema as this information is already embeded
529         // in create_output_tuple method of operators' functor.
530
531
532
533         MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
534                 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
535
536                 fta_name = name;
537                 failed = false;
538
539                 root = node;
540                 lfta_list = lftas;
541
542                 // assign streamid
543                 _fta.ftaid = ftaid;
544                 _fta.ftaid.streamid = (gs_p_t)this;
545
546                 schema_handle = sch_handle;
547
548                 output_queue_mem = 0;
549
550                 // topologically sort the operators in the tree (or DAG)
551                 // for DAG we make sure we add the node to the sorted list only once
552                 operator_node* current_node;
553                 map<operator_node*, int> node_map;
554                 vector<operator_node*> node_list;
555
556                 int i = 0;
557                 node_list.push_back(root);
558                 node_map[root] = 0;
559
560                 num_operators = 1;
561
562                 while (i < node_list.size()) {
563                         current_node = node_list[i];
564                         if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
565                                 node_map[current_node->left_child] = num_operators++;
566                                 node_list.push_back(current_node->left_child);
567                         }
568                         if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
569                                 node_map[current_node->right_child] = num_operators++;
570                                 node_list.push_back(current_node->right_child);
571                         }
572                         i++;
573                 }
574                 num_operators = i;
575
576                 // build adjacency lists for query DAG
577                 list<int>* adj_lists = new list<int>[num_operators];
578                 bool* leaf_flags = new bool[num_operators];
579                 memset(leaf_flags, 0, num_operators * sizeof(bool));
580                 for (i = 0; i < num_operators; ++i) {
581                         current_node = node_list[i];
582                         if (current_node->left_child) {
583                                 adj_lists[i].push_back(node_map[current_node->left_child]);
584                         }
585                         if (current_node->right_child && current_node->left_child != current_node->right_child) {
586                                 adj_lists[i].push_back(node_map[current_node->right_child]);
587                         }
588                 }
589
590                 // run topolofical sort
591                 bool leaf_found = true;
592                 while (leaf_found) {
593                         leaf_found = false;
594                         // add all leafs to sorted_nodes
595                         for (i = 0; i < num_operators; ++i) {
596                                 if (!leaf_flags[i] && adj_lists[i].empty()) {
597                                         leaf_flags[i] = true;
598                                         sorted_nodes.push_back(node_list[i]);
599                                         leaf_found = true;
600
601                                         // remove the node from its parents adjecency lists
602                                         for (int j = 0; j < num_operators; ++j) {
603                                                 list<int>::iterator iter;
604                                                 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
605                                                         if (*iter == i) {
606                                                                 adj_lists[j].erase(iter);
607                                                                 break;
608                                                         }
609                                                 }
610                                         }
611                                 }
612                         }
613                 }
614
615                 delete[] adj_lists;
616                 delete[] leaf_flags;
617
618                 // set the parameter block for every operator in tree
619                 for (i = 0; i < num_operators; ++i)
620                         if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
621
622 #ifdef DEBUG
623                 fprintf(stderr,"Instantiate FTAs\n");
624 #endif
625                 /* extract lfta param block from hfta param block */
626 //                      NOTE: param_list must line up with lfta_list
627                 list<param_block> param_list;
628                 get_lfta_params(sz, value, param_list);
629                 list<param_block>::iterator iter1;
630                 list<lfta_info*>::iterator iter2 = lfta_list->begin();
631
632                 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
633                         lfta_info* inf = *iter2;
634
635                 #ifdef DEBUG
636                         fprintf(stderr,"Instantiate a FTA\n");
637                 #endif
638
639                         gs_uint32_t reuse_flag = 2;
640
641                         // we will try to create a new instance of child FTA only if it is parameterized
642                         if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
643                                 (*iter2)->f.streamid = 0;       // not interested in existing instances
644                                 reuse_flag = 0;
645                         }
646                         if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
647                                 fprintf(stderr,"HFTA::error:could instantiate a FTA");
648                                 failed = true;
649                                 return;
650                         }
651
652                         free((*iter1).data);
653
654                         //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
655
656                         _fta.stream_subscribed[i]=(*iter2)->f;
657                 }
658                 _fta.stream_subscribed_cnt = i;
659
660                 num_eof_tuples = 0;
661
662                 _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
663                 _fta.free_fta = MULTOP_HFTA_free_fta;
664                 _fta.control_fta = MULTOP_HFTA_control_fta;
665                 _fta.accept_packet = MULTOP_HFTA_accept_packet;
666                 _fta.clock_fta = MULTOP_HFTA_clock_fta;
667
668                 // init runtime stats
669                 in_tuple_cnt = 0;
670                 out_tuple_cnt = 0;
671                 out_tuple_sz = 0;
672                 cycle_cnt = 0;
673
674         }
675
676         ~MULTOP_HFTA() {
677
678                 list<lfta_info*>::iterator iter;
679                 int i = 0;
680
681                 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
682                 delete *iter;
683                 }
684
685                 delete root;    // free operators memory
686                 delete lfta_list;
687
688
689      }
690
691         int flush() {
692
693                 list<host_tuple> res;
694
695                 // go through the list of operators in topological order
696                 // and flush them
697                 list<host_tuple>::iterator iter;
698                 list<host_tuple> temp_output_queue;
699
700                 for (int i = 0; i < num_operators; ++i) {
701                         operator_node* node = sorted_nodes[i];
702
703 #ifdef PLAN_DAG
704                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
705 #else
706                         // for trees we can put output tuples directly into parent's input buffer
707                         list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
708 #endif
709                         // consume tuples waiting in your queue
710                         for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
711                                 node->op->accept_tuple(*(iter), current_output_queue);
712                         }
713                         node->op->flush(current_output_queue);
714                         node->input_queue.clear();
715
716 #ifdef PLAN_DAG
717                         // copy the tuples from output queue into input queues of all parents
718                         list<operator_node*>::iterator node_iter;
719
720                         if (!node->parent_list.empty()) {
721                                 // append the content of the output queue to parent input queue
722
723                                 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
724                                         int* ref_cnt = 0;
725                                         if (node->parent_list.size() > 1) {
726                                                 ref_cnt = (int*)malloc(sizeof(int));
727                                                 *ref_cnt = node->parent_list.size() - 1;
728                                         }
729
730                                         for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
731                                                 (*iter).ref_cnt = ref_cnt;
732                                                 (*node_iter)->input_queue.push_back(*iter);
733                                         }
734                                 }
735                         }
736 #endif
737                 }
738
739                 if (!res.empty()) {
740                         // go through the list of returned tuples and finalyze them
741                         list<host_tuple>::iterator iter = res.begin();
742                         while (iter != res.end()) {
743                                 host_tuple& tup = *iter;
744
745                                 // finalize the tuple
746                                 if (tup.tuple_size)
747                                         finalize_tuple(tup);
748
749                                 output_queue_mem += tup.tuple_size;
750                                 iter++;
751                         }
752
753                         // append returned list to output_queue
754                         output_queue.splice(output_queue.end(), res);
755
756
757                         // post tuples
758                         while (!output_queue.empty()) {
759                                 host_tuple& tup = output_queue.front();
760                                 #ifdef DEBUG
761                                         fprintf(stderr, "HFTA::about to post tuple\n");
762                                 #endif
763                                 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
764                                         output_queue_mem -= tup.tuple_size;
765                                         tup.free_tuple();
766                                         output_queue.pop_front();
767                                 } else
768                                         break;
769                         }
770                 }
771
772                 return 0;
773         }
774
775         bool init_failed(){return failed;}
776 };
777
778
779 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
780         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor
781                                                                 // will be called on program exit
782
783         if (recursive) {
784                 // free instance we are subscribed to
785                 list<lfta_info*>::iterator iter;
786                 int i = 0;
787
788                 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
789                         fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
790                 }
791         }
792
793         delete ftap;
794         return 0;
795 }
796
797 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
798         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
799
800         if (command == FTA_COMMAND_FLUSH) {
801
802                 // ask lftas to do the flush
803                 list<lfta_info*>::iterator iter;
804                 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
805                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
806                 }
807                 // flush hfta operators
808                 ftap->flush();
809
810         } else if (command == FTA_COMMAND_LOAD_PARAMS) {
811
812                 list<param_block> param_list;
813                 get_lfta_params(sz, value, param_list);
814
815                 // ask lftas to do the flush and set new parameters
816                 list<lfta_info*>::iterator iter;
817                 list<param_block>::iterator iter2;
818                 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
819                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
820                         fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
821                         free((*iter2).data);
822                 }
823                 // flush hfta operators
824                 ftap->flush();
825
826                 // set the new parameter block for every operator in tree
827                 for (int i = 0; i < ftap->num_operators; ++i)
828                         ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
829
830         } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
831                 // we no longer use temp_status commands
832                 // hearbeat mechanism is used instead
833         }
834         return 0;
835 }
836
837 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
838         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
839
840         gs_uint64_t start_cycle = rdtsc();
841 #ifdef DEBUG
842         fprintf(stderr, "HFTA::accepted packet\n");
843 #endif
844         if (!length)     /* ignore null tuples */
845                 return 0;
846
847         ftap->in_tuple_cnt++;
848
849         host_tuple temp;
850         temp.tuple_size = length;
851         temp.data = packet;
852         temp.channel = 0;
853         temp.heap_resident = false;
854
855 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
856
857         // find from which lfta the tuple came
858         list<lfta_info*>::iterator iter;
859         lfta_info* inf = NULL;
860         int i;
861
862         for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
863                 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&
864                         ftap->_fta.stream_subscribed[i].port == ftaid->port &&
865                         ftap->_fta.stream_subscribed[i].index == ftaid->index &&
866                         ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
867                         inf = *iter;
868                         break;
869                 }
870         }
871
872         if (!inf) {
873                 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
874                 exit(1);
875         }
876
877         // route tuple through operator chain
878         list<host_tuple> result;
879         host_tuple tup;
880         int ret;
881         #ifndef PLAN_DAG
882                 temp.channel = inf->output_channel;
883         #endif
884         operator_node* current_node = NULL, *child = NULL;
885         list<host_tuple> temp_output_queue;
886
887
888         fta_stat* tup_trace = NULL;
889         gs_uint32_t tup_trace_sz = 0;
890         gs_uint64_t trace_id = 0;
891         bool temp_tuple_received = false;
892
893         // if the tuple is temporal we need to extract the heartbeat payload
894         if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
895                 temp_tuple_received = true;
896                 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
897                         fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
898         }
899
900         if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
901
902                 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
903                         return 0;
904
905                 ftap->num_eof_tuples = 0;
906
907                 /* perform a flush  */
908                 ftap->flush();
909
910                 /* post eof_tuple to a parent */
911                 host_tuple eof_tuple;
912                 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
913
914                 /* last byte of the tuple specify the tuple type
915                  * set it to EOF_TUPLE
916                 */
917                 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
918                 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
919                 ftap->out_tuple_cnt++;
920                 ftap->out_tuple_sz+=eof_tuple.tuple_size;
921
922                 return 0;
923         }
924
925         list<host_tuple>::iterator iter2;
926
927 #ifdef PLAN_DAG
928
929         // push tuple to all parent operators of the lfta
930         list<operator_node*>::iterator node_iter;
931         list<unsigned>::iterator chan_iter;
932         for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
933                 temp.channel = *chan_iter;
934                 (*node_iter)->input_queue.push_back(temp);
935         }
936
937         for (i = 0; i < ftap->num_operators; ++i) {
938
939                 operator_node* node = ftap->sorted_nodes[i];
940                 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
941
942                 // consume tuples waiting in your queue
943                 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
944                         node->op->accept_tuple(*(iter2), current_output_queue);
945                 }
946                 node->input_queue.clear();
947
948                 // copy the tuples from output queue into input queues of all parents
949
950                 if (!node->parent_list.empty()) {
951
952                         // append the content of the output queue to parent input queue
953                         for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
954
955                                 int* ref_cnt = 0;
956                                 if (node->parent_list.size() > 1) {
957                                         ref_cnt = (int*)malloc(sizeof(int));
958                                         *ref_cnt = node->parent_list.size() - 1;
959                                 }
960
961                                 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
962                                         (*iter2).ref_cnt = ref_cnt;
963                                         (*iter2).channel = *chan_iter;
964                                         (*node_iter)->input_queue.push_back(*iter2);
965                                 }
966                         }
967                 }
968                 temp_output_queue.clear();
969         }
970 #else
971         current_node = inf->parent;
972
973 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
974         current_node->input_queue.push_back(temp);
975
976         do {
977 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
978                 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
979
980                 // consume tuples waiting in your queue
981                 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
982                         current_node->op->accept_tuple((*iter2),current_output_queue);
983                 }
984 //                      All consumed, delete them
985                 current_node->input_queue.clear();
986                 current_node = current_node->parent;
987
988         } while (current_node);
989 #endif
990
991
992         host_tuple temp_tup;
993
994         bool no_temp_tuple = false;
995
996         // if we received temporal tuple, last tuple of the result must be temporal too
997         // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
998         if (temp_tuple_received) {
999
1000                 if (result.empty()) {
1001                         no_temp_tuple = true;
1002
1003                 } else {
1004                         fta_stat stats;
1005                         temp_tup = result.back();
1006                         finalize_tuple(temp_tup);
1007
1008                         int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
1009                         char* new_data = (char*)malloc(new_tuple_size);
1010                         memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
1011                         memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
1012                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
1013
1014
1015                         memset((char*)&stats, 0, sizeof(fta_stat));
1016                         stats.ftaid = fta->ftaid;
1017                         stats.in_tuple_cnt = ftap->in_tuple_cnt;
1018                         stats.out_tuple_cnt = ftap->out_tuple_cnt;
1019                         stats.out_tuple_sz = ftap->out_tuple_sz;
1020                         stats.cycle_cnt = ftap->cycle_cnt;
1021                         memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
1022
1023                         // Send a hearbeat message to clearinghouse.
1024                         // Disable sending heartbeats for now to avoid overloading clearinghouse
1025                         // fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
1026
1027                         // reset the stats
1028                         ftap->in_tuple_cnt = 0;
1029                         ftap->out_tuple_cnt = 0;
1030                         ftap->out_tuple_sz = 0;
1031                         ftap->cycle_cnt = 0;
1032
1033                         free(temp_tup.data);
1034                         temp_tup.data = new_data;
1035                         temp_tup.tuple_size = new_tuple_size;
1036                         result.pop_back();
1037                 }
1038         }
1039
1040         // go through the list of returned tuples and finalyze them
1041         // since we can produce multiple temporal tuples in DAG plans
1042         // we can drop all of them except the last one
1043         iter2 = result.begin();
1044         while(iter2 != result.end()) {
1045                 host_tuple tup = *iter2;
1046
1047                 // finalize the tuple
1048                 if (tup.tuple_size) {
1049                         finalize_tuple(tup);
1050
1051         #ifdef PLAN_DAG
1052                 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1053                         tup.free_tuple();
1054                 else
1055         #endif
1056                         {
1057                         ftap->output_queue.push_back(tup);
1058                         ftap->output_queue_mem += tup.tuple_size;
1059                         }
1060
1061                 }
1062                 iter2++;
1063         }
1064
1065         // append returned list to output_queue
1066         // ftap->output_queue.splice(ftap->output_queue.end(), result);
1067
1068         if (temp_tuple_received && !no_temp_tuple) {
1069                 ftap->output_queue.push_back(temp_tup);
1070                 ftap->output_queue_mem += temp_tup.tuple_size;
1071         }
1072
1073         // post tuples
1074         while (!ftap->output_queue.empty()) {
1075                 host_tuple& tup = ftap->output_queue.front();
1076                 #ifdef DEBUG
1077                         fprintf(stderr, "HFTA::about to post tuple\n");
1078                 #endif
1079                 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1080                         ftap->out_tuple_cnt++;
1081                         ftap->out_tuple_sz+=tup.tuple_size;
1082                         ftap->output_queue_mem -= tup.tuple_size;
1083                         tup.free_tuple();
1084                         ftap->output_queue.pop_front();
1085                 } else
1086                         break;
1087         }
1088
1089         ftap->cycle_cnt += rdtsc() - start_cycle;
1090
1091         return 1;
1092 }
1093
1094 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1095         MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1096
1097 #ifdef HFTA_PROFILE
1098         /* Print stats */
1099         fprintf(stderr, "FTA = %s|", ftap->fta_name);
1100         fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1101         fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1102         fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1103         fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1104
1105
1106                 fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1107                 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1108
1109                 for (int i = 1; i < ftap->num_operators; ++i) {
1110                         operator_node* node = ftap->sorted_nodes[i];
1111                         fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1112                         total_mem += node->op->get_mem_footprint();
1113                 }
1114                 fprintf(stderr, ", total = %d|", total_mem );
1115                 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1116 #endif
1117
1118         fta_stat stats;
1119         memset((char*)&stats, 0, sizeof(fta_stat));
1120         stats.ftaid = fta->ftaid;
1121         stats.in_tuple_cnt = ftap->in_tuple_cnt;
1122         stats.out_tuple_cnt = ftap->out_tuple_cnt;
1123         stats.out_tuple_sz = ftap->out_tuple_sz;
1124         stats.cycle_cnt = ftap->cycle_cnt;
1125
1126         // Send a hearbeat message to clearinghouse.
1127         // Disable sending heartbeats for now to avoid overloading clearinghouse
1128         // fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1129
1130         // resets runtime stats
1131         ftap->in_tuple_cnt = 0;
1132         ftap->out_tuple_cnt = 0;
1133         ftap->out_tuple_sz = 0;
1134         ftap->cycle_cnt = 0;
1135
1136         return 0;
1137 }
1138
1139
1140 #endif  // __HFTA_H
1141