Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / hfta.h
index b95f77d..24d14e6 100644 (file)
-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
- ------------------------------------------- */
-
-#ifndef __HFTA_H
-#define __HFTA_H
-
-#include "gstypes.h"
-#include "host_tuple.h"
-#include "base_operator.h"
-#include <vector>
-#include <map>
-#include<math.h>
-#include "rdtsc.h"
-using namespace std;
-
-#define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
-#define LLMIN(x,y) ((x)<(y)?(x):(y))
-#define LLMAX(x,y) ((x)<(y)?(y):(x))
-#define UMIN(x,y) ((x)<(y)?(x):(y))
-#define UMAX(x,y) ((x)<(y)?(y):(x))
-#define EQ(x,y) ((x)==(y))
-#define GEQ(x,y) ((x)>=(y))
-#define LEQ(x,y) ((x)<=(y))
-//     Cast away temporality
-#define non_temporal(x)(x)
-//     Access math libraries
-#define sqrt(x) sqrt(x)
-
-
-extern "C" {
-#include <lapp.h>
-#include <fta.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <schemaparser.h>
-}
-
-struct param_block {
-       gs_int32_t block_length;
-       void* data;
-};
-
-// forward declaration of operator_node
-struct operator_node;
-
-struct lfta_info {
-       gs_schemahandle_t schema_handle;
-       gs_sp_t schema;
-       gs_sp_t fta_name;
-#ifdef PLAN_DAG
-       list<operator_node*> parent_list;
-       list<unsigned> out_channel_list;
-#else
-       operator_node* parent;
-       unsigned output_channel;
-#endif
-       FTAID f;
-
-       lfta_info() {
-               schema_handle = -1;
-               schema = NULL;
-               #ifndef PLAN_DAG
-                       parent = NULL;
-                       output_channel = 0;
-               #endif
-       }
-
-       ~lfta_info() {
-               if (fta_name)
-                       free (fta_name);
-               if (schema)
-                       free (schema);
-               if (schema_handle >= 0)
-                       ftaschema_free(schema_handle);
-       }
-};
-
-
-struct operator_node {
-       base_operator* op;
-
-#ifdef PLAN_DAG
-       list<operator_node*> parent_list;
-       list<unsigned> out_channel_list;
-#else
-       operator_node* parent;
-#endif
-
-       operator_node* left_child;
-       operator_node* right_child;
-       lfta_info* left_lfta;
-       lfta_info* right_lfta;
-
-       list<host_tuple> input_queue;
-
-       operator_node(base_operator* o) {
-               op = o;
-               #ifndef PLAN_DAG
-                       parent = NULL;
-               #endif
-               left_child = right_child = NULL;
-               left_lfta = right_lfta = NULL;
-       }
-
-       ~operator_node() {
-               delete op;
-       }
-
-       void set_left_child_node(operator_node* child) {
-               left_child = child;
-               if (child) {
-                       #ifdef PLAN_DAG
-                               child->parent_list.push_back(this);
-                               child->out_channel_list.push_back(0);
-                       #else
-                               child->parent = this;
-                               child->op->set_output_channel(0);
-                       #endif
-               }
-       }
-
-       void set_right_child_node(operator_node* child) {
-               right_child = child;
-               if (child) {
-                       #ifdef PLAN_DAG
-                               child->parent_list.push_back(this);
-                               child->out_channel_list.push_back(1);
-                       #else
-                               child->parent = this;
-                               child->op->set_output_channel(1);
-                       #endif
-               }
-       }
-
-       void set_left_lfta(lfta_info* l_lfta) {
-               left_lfta = l_lfta;
-               if (left_lfta) {
-                       #ifdef PLAN_DAG
-                               left_lfta->parent_list.push_back(this);
-                               left_lfta->out_channel_list.push_back(0);
-                       #else
-                               left_lfta->parent = this;
-                               left_lfta->output_channel = 0;
-                       #endif
-               }
-       }
-
-       void set_right_lfta(lfta_info* r_lfta) {
-               right_lfta = r_lfta;
-               if (right_lfta) {
-                       #ifdef PLAN_DAG
-                               right_lfta->parent_list.push_back(this);
-                               right_lfta->out_channel_list.push_back(1);
-                       #else
-                               right_lfta->parent = this;
-                               right_lfta->output_channel = 1;
-                       #endif
-               }
-       }
-
-};
-
-
-
-int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
-void finalize_tuple(host_tuple &tup);
-void finalize_tuple(host_tuple &tup);
-gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
-gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
-gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
-gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
-
-gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
-gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
-gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
-gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
-
-struct UNOP_HFTA {
-       struct FTA _fta;
-       base_operator* oper;
-       FTAID f;
-       bool failed;
-       gs_schemahandle_t schema_handle;
-
-       list<host_tuple> output_queue;
-
-       // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
-       // lfta_name and an instance of the base_operator
-       // We don't need to know the output schema as this information is already embeded
-       // in create_output_tuple method of operators' functor.
-       UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
-               gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
-
-               failed = false;
-               oper = op;
-               f = child_ftaid;
-               schema_handle = sch_handle;
-
-               // assign streamid
-               _fta.ftaid = ftaid;
-               _fta.ftaid.streamid = (gs_p_t)this;
-
-#ifdef DEBUG
-               fprintf(stderr,"Instantiate a FTA\n");
-#endif
-               /* extract lfta param block from hfta param block */
-               list<param_block> param_list;
-               get_lfta_params(sz, value, param_list);
-               param_block param = param_list.front();
-
-
-        gs_uint32_t reuse_flag = 2;
-        // we will try to create a new instance of child FTA only if it is parameterized
-        if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
-               f.streamid = 0; // not interested in existing instances
-               reuse_flag = 0;
-               }
-
-               if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
-               fprintf(stderr,"HFTA::error:could instantiate a FTA");
-                       failed = true;
-                   return;
-           }
-
-               free(param.data);
-               // set the operator's parameters
-               if(oper->set_param_block(sz, (void*)value)) failed = true;;
-
-
-               fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
-               _fta.stream_subscribed_cnt = 1;
-               _fta.stream_subscribed[0] = f;
-
-               _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
-               _fta.free_fta = UNOP_HFTA_free_fta;
-               _fta.control_fta = UNOP_HFTA_control_fta;
-               _fta.accept_packet = UNOP_HFTA_accept_packet;
-               _fta.clock_fta = UNOP_HFTA_clock_fta;
-       }
-
-       ~UNOP_HFTA() {
-         delete oper;    // free operators memory
-
-     }
-
-     int flush() {
-               list<host_tuple> res;
-               if (!oper->flush(res)) {
-
-                       if (!res.empty()) {
-                               // go through the list of returned tuples and finalyze them
-                               list<host_tuple>::iterator iter = res.begin();
-                               while (iter != res.end()) {
-                                       host_tuple& tup = *iter;
-
-                                       // finalize the tuple
-                                       if (tup.tuple_size)
-                                               finalize_tuple(tup);
-                                       iter++;
-                               }
-
-                               // append returned list to output_queue
-                               output_queue.splice(output_queue.end(), res);
-
-
-                               // post tuples
-                               while (!output_queue.empty()) {
-                                       host_tuple& tup = output_queue.front();
-                                       #ifdef DEBUG
-                                               fprintf(stderr, "HFTA::about to post tuple\n");
-                                       #endif
-                                       if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
-                                               tup.free_tuple();
-                                               output_queue.pop_front();
-                                       } else
-                                               break;
-                               }
-                       }
-               }
-
-               return 0;
-        }
-
-       bool init_failed(){return failed;}
-};
-
-gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
-       UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor
-                                                               // will be called on program exit
-
-       if (recursive)
-               // free instance we are subscribed to
-               fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
-
-       delete ftap;
-       return 0;
-}
-
-gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
-       UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
-
-       if (command == FTA_COMMAND_FLUSH) {
-               // ask lfta to do the flush
-               fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
-               ftap->flush();
-
-       } else if (command == FTA_COMMAND_LOAD_PARAMS) {
-               // ask lfta to do the flush
-               fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
-               ftap->flush();
-
-               /* extract lfta param block from hfta param block */
-               list<param_block> param_list;
-               get_lfta_params(sz, value, param_list);
-               param_block param = param_list.front();
-               // load new parameters into lfta
-               fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
-               free(param.data);
-
-               // notify the operator about the change of parameter
-               ftap->oper->set_param_block(sz, (void*)value);
-
-       } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
-               // we no longer use temp_status commands
-               // hearbeat mechanism is used instead
-       }
-       return 0;
-}
-
-gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
-       UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
-#ifdef DEBUG
-       fprintf(stderr, "HFTA::accepted packet\n");
-#endif
-       if (!length)     /* ignore null tuples */
-               return 0;
-
-       host_tuple temp;
-       temp.tuple_size = length;
-       temp.data = packet;
-       temp.channel = 0;
-       temp.heap_resident = false;
-
-       // pass the tuple to operator
-       list<host_tuple> res;
-       int ret;
-       fta_stat* tup_trace = NULL;
-       gs_uint32_t tup_trace_sz = 0;
-       gs_uint64_t trace_id = 0;
-       bool temp_tuple_received = false;
-
-
-       // if the tuple is temporal we need to extract the hearbeat payload
-       if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
-               temp_tuple_received = true;
-               if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
-                       fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
-       }
-
-       if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
-               /* perform a flush  */
-               ftap->flush();
-
-               /* post eof_tuple to a parent */
-               host_tuple eof_tuple;
-               ftap->oper->get_temp_status(eof_tuple);
-
-               /* last byte of the tuple specifies the tuple type
-                * set it to EOF_TUPLE
-               */
-               *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
-               hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
-
-               return 0;
-       }
-
-       ret = ftap->oper->accept_tuple(temp, res);
-
-       // go through the list of returned tuples and finalyze them
-       list<host_tuple>::iterator iter = res.begin();
-       while (iter != res.end()) {
-               host_tuple& tup = *iter;
-
-               // finalize the tuple
-               if (tup.tuple_size)
-                       finalize_tuple(tup);
-               iter++;
-       }
-
-       // if we received temporal tuple, last tuple of the result must be temporal too
-       // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
-       if (temp_tuple_received) {
-               fta_stat stats;
-               host_tuple& temp_tup = res.back();
-
-
-               int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
-               char* new_data = (char*)malloc(new_tuple_size);
-               memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
-               memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
-               memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
-               memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
-
-               memset((char*)&stats, 0, sizeof(fta_stat));
-               memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
-
-               // Send a hearbeat message to clearinghouse.
-               fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
-
-               temp_tup.free_tuple();
-               temp_tup.data = new_data;
-               temp_tup.tuple_size = new_tuple_size;
-       }
-
-       // append returned list to output_queue
-       ftap->output_queue.splice(ftap->output_queue.end(), res);
-
-       // post tuples
-       while (!ftap->output_queue.empty()) {
-               host_tuple& tup = ftap->output_queue.front();
-               #ifdef DEBUG
-                       fprintf(stderr, "HFTA::about to post tuple\n");
-               #endif
-               if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
-                       tup.free_tuple();
-                       ftap->output_queue.pop_front();
-               } else
-                       break;
-       }
-
-       return 1;
-}
-
-gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
-
-       // Send a hearbeat message to clearinghouse.to indicate we are alive
-       fta_heartbeat(ftap->ftaid, 0, 0, 0);
-
-       return 0;
-}
-
-
-struct MULTOP_HFTA {
-       struct FTA _fta;
-       gs_csp_t fta_name;
-       gs_schemahandle_t schema_handle;
-       operator_node* root;
-       vector<operator_node*> sorted_nodes;
-       int num_operators;
-       list<lfta_info*> *lfta_list;
-       /* number of eof tuples we received so far
-        * receiving eof tuples from every source fta will cause a flush
-       */
-       int num_eof_tuples;
-
-       bool failed;
-       bool reusable;
-
-       list<host_tuple> output_queue;
-
-       // Runtime stats
-       gs_uint32_t in_tuple_cnt;
-       gs_uint32_t out_tuple_cnt;
-       gs_uint32_t out_tuple_sz;
-       gs_uint64_t cycle_cnt;
-
-       gs_uint64_t trace_id;
-
-       // memory occupied by output queue
-       gs_uint32_t output_queue_mem;
-
-
-       // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
-       // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
-       // as the schema handle is already passed during operator creation time.
-       // We also don't need to know the output schema as this information is already embeded
-       // in create_output_tuple method of operators' functor.
-
-
-
-       MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
-               list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
-
-               fta_name = name;
-               failed = false;
-
-               root = node;
-               lfta_list = lftas;
-
-               // assign streamid
-               _fta.ftaid = ftaid;
-               _fta.ftaid.streamid = (gs_p_t)this;
-
-               schema_handle = sch_handle;
-
-               output_queue_mem = 0;
-
-               // topologically sort the operators in the tree (or DAG)
-               // for DAG we make sure we add the node to the sorted list only once
-               operator_node* current_node;
-               map<operator_node*, int> node_map;
-               vector<operator_node*> node_list;
-
-               int i = 0;
-               node_list.push_back(root);
-               node_map[root] = 0;
-
-               num_operators = 1;
-
-               while (i < node_list.size()) {
-                       current_node = node_list[i];
-                       if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
-                               node_map[current_node->left_child] = num_operators++;
-                               node_list.push_back(current_node->left_child);
-                       }
-                       if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
-                               node_map[current_node->right_child] = num_operators++;
-                               node_list.push_back(current_node->right_child);
-                       }
-                       i++;
-               }
-               num_operators = i;
-
-               // build adjacency lists for query DAG
-               list<int>* adj_lists = new list<int>[num_operators];
-               bool* leaf_flags = new bool[num_operators];
-               memset(leaf_flags, 0, num_operators * sizeof(bool));
-               for (i = 0; i < num_operators; ++i) {
-                       current_node = node_list[i];
-                       if (current_node->left_child) {
-                               adj_lists[i].push_back(node_map[current_node->left_child]);
-                       }
-                       if (current_node->right_child && current_node->left_child != current_node->right_child) {
-                               adj_lists[i].push_back(node_map[current_node->right_child]);
-                       }
-               }
-
-               // run topolofical sort
-               bool leaf_found = true;
-               while (leaf_found) {
-                       leaf_found = false;
-                       // add all leafs to sorted_nodes
-                       for (i = 0; i < num_operators; ++i) {
-                               if (!leaf_flags[i] && adj_lists[i].empty()) {
-                                       leaf_flags[i] = true;
-                                       sorted_nodes.push_back(node_list[i]);
-                                       leaf_found = true;
-
-                                       // remove the node from its parents adjecency lists
-                                       for (int j = 0; j < num_operators; ++j) {
-                                               list<int>::iterator iter;
-                                               for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
-                                                       if (*iter == i) {
-                                                               adj_lists[j].erase(iter);
-                                                               break;
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               delete[] adj_lists;
-               delete[] leaf_flags;
-
-               // set the parameter block for every operator in tree
-               for (i = 0; i < num_operators; ++i)
-                       if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
-
-#ifdef DEBUG
-               fprintf(stderr,"Instantiate FTAs\n");
-#endif
-               /* extract lfta param block from hfta param block */
-//                     NOTE: param_list must line up with lfta_list
-               list<param_block> param_list;
-               get_lfta_params(sz, value, param_list);
-               list<param_block>::iterator iter1;
-               list<lfta_info*>::iterator iter2 = lfta_list->begin();
-
-               for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
-                       lfta_info* inf = *iter2;
-
-               #ifdef DEBUG
-                       fprintf(stderr,"Instantiate a FTA\n");
-               #endif
-
-                       gs_uint32_t reuse_flag = 2;
-
-                       // we will try to create a new instance of child FTA only if it is parameterized
-                       if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
-                               (*iter2)->f.streamid = 0;       // not interested in existing instances
-                               reuse_flag = 0;
-                       }
-                       if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
-                               fprintf(stderr,"HFTA::error:could instantiate a FTA");
-                               failed = true;
-                               return;
-                       }
-
-                       free((*iter1).data);
-
-                       //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
-
-                       _fta.stream_subscribed[i]=(*iter2)->f;
-               }
-               _fta.stream_subscribed_cnt = i;
-
-               num_eof_tuples = 0;
-
-               _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)
-               _fta.free_fta = MULTOP_HFTA_free_fta;
-               _fta.control_fta = MULTOP_HFTA_control_fta;
-               _fta.accept_packet = MULTOP_HFTA_accept_packet;
-               _fta.clock_fta = MULTOP_HFTA_clock_fta;
-
-               // init runtime stats
-               in_tuple_cnt = 0;
-               out_tuple_cnt = 0;
-               out_tuple_sz = 0;
-               cycle_cnt = 0;
-
-       }
-
-       ~MULTOP_HFTA() {
-
-               list<lfta_info*>::iterator iter;
-               int i = 0;
-
-               for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
-               delete *iter;
-               }
-
-               delete root;    // free operators memory
-               delete lfta_list;
-
-
-     }
-
-       int flush() {
-
-               list<host_tuple> res;
-
-               // go through the list of operators in topological order
-               // and flush them
-               list<host_tuple>::iterator iter;
-               list<host_tuple> temp_output_queue;
-
-               for (int i = 0; i < num_operators; ++i) {
-                       operator_node* node = sorted_nodes[i];
-
-#ifdef PLAN_DAG
-                       list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
-#else
-                       // for trees we can put output tuples directly into parent's input buffer
-                       list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
-#endif
-                       // consume tuples waiting in your queue
-                       for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
-                               node->op->accept_tuple(*(iter), current_output_queue);
-                       }
-                       node->op->flush(current_output_queue);
-                       node->input_queue.clear();
-
-#ifdef PLAN_DAG
-                       // copy the tuples from output queue into input queues of all parents
-                       list<operator_node*>::iterator node_iter;
-
-                       if (!node->parent_list.empty()) {
-                               // append the content of the output queue to parent input queue
-
-                               for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
-                                       int* ref_cnt = 0;
-                                       if (node->parent_list.size() > 1) {
-                                               ref_cnt = (int*)malloc(sizeof(int));
-                                               *ref_cnt = node->parent_list.size() - 1;
-                                       }
-
-                                       for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
-                                               (*iter).ref_cnt = ref_cnt;
-                                               (*node_iter)->input_queue.push_back(*iter);
-                                       }
-                               }
-                       }
-#endif
-               }
-
-               if (!res.empty()) {
-                       // go through the list of returned tuples and finalyze them
-                       list<host_tuple>::iterator iter = res.begin();
-                       while (iter != res.end()) {
-                               host_tuple& tup = *iter;
-
-                               // finalize the tuple
-                               if (tup.tuple_size)
-                                       finalize_tuple(tup);
-
-                               output_queue_mem += tup.tuple_size;
-                               iter++;
-                       }
-
-                       // append returned list to output_queue
-                       output_queue.splice(output_queue.end(), res);
-
-
-                       // post tuples
-                       while (!output_queue.empty()) {
-                               host_tuple& tup = output_queue.front();
-                               #ifdef DEBUG
-                                       fprintf(stderr, "HFTA::about to post tuple\n");
-                               #endif
-                               if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
-                                       output_queue_mem -= tup.tuple_size;
-                                       tup.free_tuple();
-                                       output_queue.pop_front();
-                               } else
-                                       break;
-                       }
-               }
-
-               return 0;
-       }
-
-       bool init_failed(){return failed;}
-};
-
-
-gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
-       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor
-                                                               // will be called on program exit
-
-       if (recursive) {
-               // free instance we are subscribed to
-               list<lfta_info*>::iterator iter;
-               int i = 0;
-
-               for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
-                       fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
-               }
-       }
-
-       delete ftap;
-       return 0;
-}
-
-gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
-       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
-
-       if (command == FTA_COMMAND_FLUSH) {
-
-               // ask lftas to do the flush
-               list<lfta_info*>::iterator iter;
-               for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
-                       fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
-               }
-               // flush hfta operators
-               ftap->flush();
-
-       } else if (command == FTA_COMMAND_LOAD_PARAMS) {
-
-               list<param_block> param_list;
-               get_lfta_params(sz, value, param_list);
-
-               // ask lftas to do the flush and set new parameters
-               list<lfta_info*>::iterator iter;
-               list<param_block>::iterator iter2;
-               for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
-                       fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
-                       fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
-                       free((*iter2).data);
-               }
-               // flush hfta operators
-               ftap->flush();
-
-               // set the new parameter block for every operator in tree
-               for (int i = 0; i < ftap->num_operators; ++i)
-                       ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
-
-       } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
-               // we no longer use temp_status commands
-               // hearbeat mechanism is used instead
-       }
-       return 0;
-}
-
-gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
-       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
-
-       gs_uint64_t start_cycle = rdtsc();
-#ifdef DEBUG
-       fprintf(stderr, "HFTA::accepted packet\n");
-#endif
-       if (!length)     /* ignore null tuples */
-               return 0;
-
-       ftap->in_tuple_cnt++;
-
-       host_tuple temp;
-       temp.tuple_size = length;
-       temp.data = packet;
-       temp.channel = 0;
-       temp.heap_resident = false;
-
-// fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
-
-       // find from which lfta the tuple came
-       list<lfta_info*>::iterator iter;
-       lfta_info* inf = NULL;
-       int i;
-
-       for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
-               if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&
-                       ftap->_fta.stream_subscribed[i].port == ftaid->port &&
-                       ftap->_fta.stream_subscribed[i].index == ftaid->index &&
-                       ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
-                       inf = *iter;
-                       break;
-               }
-       }
-
-       if (!inf) {
-               fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
-               exit(1);
-       }
-
-       // route tuple through operator chain
-       list<host_tuple> result;
-       host_tuple tup;
-       int ret;
-       #ifndef PLAN_DAG
-               temp.channel = inf->output_channel;
-       #endif
-       operator_node* current_node = NULL, *child = NULL;
-       list<host_tuple> temp_output_queue;
-
-
-       fta_stat* tup_trace = NULL;
-       gs_uint32_t tup_trace_sz = 0;
-       gs_uint64_t trace_id = 0;
-       bool temp_tuple_received = false;
-
-       // if the tuple is temporal we need to extract the heartbeat payload
-       if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
-               temp_tuple_received = true;
-               if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
-                       fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
-       }
-
-       if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
-
-               if (++ftap->num_eof_tuples < ftap->lfta_list->size())
-                       return 0;
-
-               ftap->num_eof_tuples = 0;
-
-               /* perform a flush  */
-               ftap->flush();
-
-               /* post eof_tuple to a parent */
-               host_tuple eof_tuple;
-               ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
-
-               /* last byte of the tuple specify the tuple type
-                * set it to EOF_TUPLE
-               */
-               *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
-               hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
-               ftap->out_tuple_cnt++;
-               ftap->out_tuple_sz+=eof_tuple.tuple_size;
-
-               return 0;
-       }
-
-       list<host_tuple>::iterator iter2;
-
-#ifdef PLAN_DAG
-
-       // push tuple to all parent operators of the lfta
-       list<operator_node*>::iterator node_iter;
-       list<unsigned>::iterator chan_iter;
-       for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
-               temp.channel = *chan_iter;
-               (*node_iter)->input_queue.push_back(temp);
-       }
-
-       for (i = 0; i < ftap->num_operators; ++i) {
-
-               operator_node* node = ftap->sorted_nodes[i];
-               list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
-
-               // consume tuples waiting in your queue
-               for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
-                       node->op->accept_tuple(*(iter2), current_output_queue);
-               }
-               node->input_queue.clear();
-
-               // copy the tuples from output queue into input queues of all parents
-
-               if (!node->parent_list.empty()) {
-
-                       // append the content of the output queue to parent input queue
-                       for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
-
-                               int* ref_cnt = 0;
-                               if (node->parent_list.size() > 1) {
-                                       ref_cnt = (int*)malloc(sizeof(int));
-                                       *ref_cnt = node->parent_list.size() - 1;
-                               }
-
-                               for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
-                                       (*iter2).ref_cnt = ref_cnt;
-                                       (*iter2).channel = *chan_iter;
-                                       (*node_iter)->input_queue.push_back(*iter2);
-                               }
-                       }
-               }
-               temp_output_queue.clear();
-       }
-#else
-       current_node = inf->parent;
-
-// fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
-       current_node->input_queue.push_back(temp);
-
-       do {
-//fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
-               list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
-
-               // consume tuples waiting in your queue
-               for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
-                       current_node->op->accept_tuple((*iter2),current_output_queue);
-               }
-//                     All consumed, delete them
-               current_node->input_queue.clear();
-               current_node = current_node->parent;
-
-       } while (current_node);
-#endif
-
-
-       host_tuple temp_tup;
-
-       bool no_temp_tuple = false;
-
-       // if we received temporal tuple, last tuple of the result must be temporal too
-       // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
-       if (temp_tuple_received) {
-
-               if (result.empty()) {
-                       no_temp_tuple = true;
-
-               } else {
-                       fta_stat stats;
-                       temp_tup = result.back();
-                       finalize_tuple(temp_tup);
-
-                       int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
-                       char* new_data = (char*)malloc(new_tuple_size);
-                       memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
-                       memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
-                       memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
-
-
-                       memset((char*)&stats, 0, sizeof(fta_stat));
-                       stats.ftaid = fta->ftaid;
-                       stats.in_tuple_cnt = ftap->in_tuple_cnt;
-                       stats.out_tuple_cnt = ftap->out_tuple_cnt;
-                       stats.out_tuple_sz = ftap->out_tuple_sz;
-                       stats.cycle_cnt = ftap->cycle_cnt;
-                       memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
-
-                       // Send a hearbeat message to clearinghouse.
-                       fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
-
-                       // reset the stats
-                       ftap->in_tuple_cnt = 0;
-                       ftap->out_tuple_cnt = 0;
-                       ftap->out_tuple_sz = 0;
-                       ftap->cycle_cnt = 0;
-
-                       free(temp_tup.data);
-                       temp_tup.data = new_data;
-                       temp_tup.tuple_size = new_tuple_size;
-                       result.pop_back();
-               }
-       }
-
-       // go through the list of returned tuples and finalyze them
-       // since we can produce multiple temporal tuples in DAG plans
-       // we can drop all of them except the last one
-       iter2 = result.begin();
-       while(iter2 != result.end()) {
-               host_tuple tup = *iter2;
-
-               // finalize the tuple
-               if (tup.tuple_size) {
-                       finalize_tuple(tup);
-
-       #ifdef PLAN_DAG
-               if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
-                       tup.free_tuple();
-               else
-       #endif
-                       {
-                       ftap->output_queue.push_back(tup);
-                       ftap->output_queue_mem += tup.tuple_size;
-                       }
-
-               }
-               iter2++;
-       }
-
-       // append returned list to output_queue
-       // ftap->output_queue.splice(ftap->output_queue.end(), result);
-
-       if (temp_tuple_received && !no_temp_tuple) {
-               ftap->output_queue.push_back(temp_tup);
-               ftap->output_queue_mem += temp_tup.tuple_size;
-       }
-
-       // post tuples
-       while (!ftap->output_queue.empty()) {
-               host_tuple& tup = ftap->output_queue.front();
-               #ifdef DEBUG
-                       fprintf(stderr, "HFTA::about to post tuple\n");
-               #endif
-               if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
-                       ftap->out_tuple_cnt++;
-                       ftap->out_tuple_sz+=tup.tuple_size;
-                       ftap->output_queue_mem -= tup.tuple_size;
-                       tup.free_tuple();
-                       ftap->output_queue.pop_front();
-               } else
-                       break;
-       }
-
-       ftap->cycle_cnt += rdtsc() - start_cycle;
-
-       return 1;
-}
-
-gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
-       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
-
-#ifdef HFTA_PROFILE
-        /* Print stats */
-        fprintf(stderr, "FTA = %s|", ftap->fta_name);
-        fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
-        fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
-        fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
-        fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
-
-
-               fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
-               unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
-
-               for (int i = 1; i < ftap->num_operators; ++i) {
-                       operator_node* node = ftap->sorted_nodes[i];
-                       fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
-                       total_mem += node->op->get_mem_footprint();
-               }
-               fprintf(stderr, ", total = %d|", total_mem );
-               fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
-#endif
-
-       fta_stat stats;
-       memset((char*)&stats, 0, sizeof(fta_stat));
-       stats.ftaid = fta->ftaid;
-       stats.in_tuple_cnt = ftap->in_tuple_cnt;
-       stats.out_tuple_cnt = ftap->out_tuple_cnt;
-       stats.out_tuple_sz = ftap->out_tuple_sz;
-       stats.cycle_cnt = ftap->cycle_cnt;
-
-       // Send a hearbeat message to clearinghouse.
-       fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
-
-       // resets runtime stats
-       ftap->in_tuple_cnt = 0;
-       ftap->out_tuple_cnt = 0;
-       ftap->out_tuple_sz = 0;
-       ftap->cycle_cnt = 0;
-
-       return 0;
-}
-
-
-#endif // __HFTA_H
-
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+     http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#ifndef __HFTA_H\r
+#define __HFTA_H\r
+\r
+#include "gstypes.h"\r
+#include "host_tuple.h"\r
+#include "base_operator.h"\r
+#include <vector>\r
+#include <map>\r
+#include<math.h>\r
+#include "rdtsc.h"\r
+using namespace std;\r
+\r
+#define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))\r
+#define LLMIN(x,y) ((x)<(y)?(x):(y))\r
+#define LLMAX(x,y) ((x)<(y)?(y):(x))\r
+#define UMIN(x,y) ((x)<(y)?(x):(y))\r
+#define UMAX(x,y) ((x)<(y)?(y):(x))\r
+#define EQ(x,y) ((x)==(y))\r
+#define GEQ(x,y) ((x)>=(y))\r
+#define LEQ(x,y) ((x)<=(y))\r
+//     Cast away temporality\r
+#define non_temporal(x)(x)\r
+//     Access math libraries\r
+#define sqrt(x) sqrt(x)\r
+\r
+\r
+extern "C" {\r
+#include <lapp.h>\r
+#include <fta.h>\r
+#include <stdlib.h>\r
+#include <stdio.h>\r
+#include <schemaparser.h>\r
+}\r
+\r
+struct param_block {\r
+       gs_int32_t block_length;\r
+       void* data;\r
+};\r
+\r
+// forward declaration of operator_node\r
+struct operator_node;\r
+\r
+struct lfta_info {\r
+       gs_schemahandle_t schema_handle;\r
+       gs_sp_t schema;\r
+       gs_sp_t fta_name;\r
+#ifdef PLAN_DAG\r
+       list<operator_node*> parent_list;\r
+       list<unsigned> out_channel_list;\r
+#else\r
+       operator_node* parent;\r
+       unsigned output_channel;\r
+#endif\r
+       FTAID f;\r
+\r
+       lfta_info() {\r
+               schema_handle = -1;\r
+               schema = NULL;\r
+               #ifndef PLAN_DAG\r
+                       parent = NULL;\r
+                       output_channel = 0;\r
+               #endif\r
+       }\r
+\r
+       ~lfta_info() {\r
+               if (fta_name)\r
+                       free (fta_name);\r
+               if (schema)\r
+                       free (schema);\r
+               if (schema_handle >= 0)\r
+                       ftaschema_free(schema_handle);\r
+       }\r
+};\r
+\r
+\r
+struct operator_node {\r
+       base_operator* op;\r
+\r
+#ifdef PLAN_DAG\r
+       list<operator_node*> parent_list;\r
+       list<unsigned> out_channel_list;\r
+#else\r
+       operator_node* parent;\r
+#endif\r
+\r
+       operator_node* left_child;\r
+       operator_node* right_child;\r
+       lfta_info* left_lfta;\r
+       lfta_info* right_lfta;\r
+\r
+       list<host_tuple> input_queue;\r
+\r
+       operator_node(base_operator* o) {\r
+               op = o;\r
+               #ifndef PLAN_DAG\r
+                       parent = NULL;\r
+               #endif\r
+               left_child = right_child = NULL;\r
+               left_lfta = right_lfta = NULL;\r
+       }\r
+\r
+       ~operator_node() {\r
+               delete op;\r
+       }\r
+\r
+       void set_left_child_node(operator_node* child) {\r
+               left_child = child;\r
+               if (child) {\r
+                       #ifdef PLAN_DAG\r
+                               child->parent_list.push_back(this);\r
+                               child->out_channel_list.push_back(0);\r
+                       #else\r
+                               child->parent = this;\r
+                               child->op->set_output_channel(0);\r
+                       #endif\r
+               }\r
+       }\r
+\r
+       void set_right_child_node(operator_node* child) {\r
+               right_child = child;\r
+               if (child) {\r
+                       #ifdef PLAN_DAG\r
+                               child->parent_list.push_back(this);\r
+                               child->out_channel_list.push_back(1);\r
+                       #else\r
+                               child->parent = this;\r
+                               child->op->set_output_channel(1);\r
+                       #endif\r
+               }\r
+       }\r
+\r
+       void set_left_lfta(lfta_info* l_lfta) {\r
+               left_lfta = l_lfta;\r
+               if (left_lfta) {\r
+                       #ifdef PLAN_DAG\r
+                               left_lfta->parent_list.push_back(this);\r
+                               left_lfta->out_channel_list.push_back(0);\r
+                       #else\r
+                               left_lfta->parent = this;\r
+                               left_lfta->output_channel = 0;\r
+                       #endif\r
+               }\r
+       }\r
+\r
+       void set_right_lfta(lfta_info* r_lfta) {\r
+               right_lfta = r_lfta;\r
+               if (right_lfta) {\r
+                       #ifdef PLAN_DAG\r
+                               right_lfta->parent_list.push_back(this);\r
+                               right_lfta->out_channel_list.push_back(1);\r
+                       #else\r
+                               right_lfta->parent = this;\r
+                               right_lfta->output_channel = 1;\r
+                       #endif\r
+               }\r
+       }\r
+\r
+};\r
+\r
+\r
+\r
+int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);\r
+void finalize_tuple(host_tuple &tup);\r
+void finalize_tuple(host_tuple &tup);\r
+gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
+gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
+gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
+gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);\r
+\r
+gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
+gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
+gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
+gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);\r
+\r
+struct UNOP_HFTA {\r
+       struct FTA _fta;\r
+       base_operator* oper;\r
+       FTAID f;\r
+       bool failed;\r
+       gs_schemahandle_t schema_handle;\r
+\r
+       list<host_tuple> output_queue;\r
+\r
+       // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
+       // lfta_name and an instance of the base_operator\r
+       // We don't need to know the output schema as this information is already embeded\r
+       // in create_output_tuple method of operators' functor.\r
+       UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,\r
+               gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {\r
+\r
+               failed = false;\r
+               oper = op;\r
+               f = child_ftaid;\r
+               schema_handle = sch_handle;\r
+\r
+               // assign streamid\r
+               _fta.ftaid = ftaid;\r
+               _fta.ftaid.streamid = (gs_p_t)this;\r
+\r
+#ifdef DEBUG\r
+               fprintf(stderr,"Instantiate a FTA\n");\r
+#endif\r
+               /* extract lfta param block from hfta param block */\r
+               list<param_block> param_list;\r
+               get_lfta_params(sz, value, param_list);\r
+               param_block param = param_list.front();\r
+\r
+\r
+        gs_uint32_t reuse_flag = 2;\r
+        // we will try to create a new instance of child FTA only if it is parameterized\r
+        if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
+               f.streamid = 0; // not interested in existing instances\r
+               reuse_flag = 0;\r
+               }\r
+\r
+               if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {\r
+               fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
+                       failed = true;\r
+                   return;\r
+           }\r
+\r
+               free(param.data);\r
+               // set the operator's parameters\r
+               if(oper->set_param_block(sz, (void*)value)) failed = true;;\r
+\r
+\r
+               fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);\r
+               _fta.stream_subscribed_cnt = 1;\r
+               _fta.stream_subscribed[0] = f;\r
+\r
+               _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)\r
+               _fta.free_fta = UNOP_HFTA_free_fta;\r
+               _fta.control_fta = UNOP_HFTA_control_fta;\r
+               _fta.accept_packet = UNOP_HFTA_accept_packet;\r
+               _fta.clock_fta = UNOP_HFTA_clock_fta;\r
+       }\r
+\r
+       ~UNOP_HFTA() {\r
+         delete oper;    // free operators memory\r
+\r
+     }\r
+\r
+     int flush() {\r
+               list<host_tuple> res;\r
+               if (!oper->flush(res)) {\r
+\r
+                       if (!res.empty()) {\r
+                               // go through the list of returned tuples and finalyze them\r
+                               list<host_tuple>::iterator iter = res.begin();\r
+                               while (iter != res.end()) {\r
+                                       host_tuple& tup = *iter;\r
+\r
+                                       // finalize the tuple\r
+                                       if (tup.tuple_size)\r
+                                               finalize_tuple(tup);\r
+                                       iter++;\r
+                               }\r
+\r
+                               // append returned list to output_queue\r
+                               output_queue.splice(output_queue.end(), res);\r
+\r
+\r
+                               // post tuples\r
+                               while (!output_queue.empty()) {\r
+                                       host_tuple& tup = output_queue.front();\r
+                                       #ifdef DEBUG\r
+                                               fprintf(stderr, "HFTA::about to post tuple\n");\r
+                                       #endif\r
+                                       if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
+                                               tup.free_tuple();\r
+                                               output_queue.pop_front();\r
+                                       } else\r
+                                               break;\r
+                               }\r
+                       }\r
+               }\r
+\r
+               return 0;\r
+        }\r
+\r
+       bool init_failed(){return failed;}\r
+};\r
+\r
+gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
+       UNOP_HFTA* ftap = (UNOP_HFTA*)fta;      // deallocate the fta and call the destructor\r
+                                                               // will be called on program exit\r
+\r
+       if (recursive)\r
+               // free instance we are subscribed to\r
+               fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);\r
+\r
+       delete ftap;\r
+       return 0;\r
+}\r
+\r
+gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
+       UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
+\r
+       if (command == FTA_COMMAND_FLUSH) {\r
+               // ask lfta to do the flush\r
+               fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
+               ftap->flush();\r
+\r
+       } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
+               // ask lfta to do the flush\r
+               fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
+               ftap->flush();\r
+\r
+               /* extract lfta param block from hfta param block */\r
+               list<param_block> param_list;\r
+               get_lfta_params(sz, value, param_list);\r
+               param_block param = param_list.front();\r
+               // load new parameters into lfta\r
+               fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);\r
+               free(param.data);\r
+\r
+               // notify the operator about the change of parameter\r
+               ftap->oper->set_param_block(sz, (void*)value);\r
+\r
+       } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
+               // we no longer use temp_status commands\r
+               // hearbeat mechanism is used instead\r
+       }\r
+       return 0;\r
+}\r
+\r
+gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
+       UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
+#ifdef DEBUG\r
+       fprintf(stderr, "HFTA::accepted packet\n");\r
+#endif\r
+       if (!length)     /* ignore null tuples */\r
+               return 0;\r
+\r
+       host_tuple temp;\r
+       temp.tuple_size = length;\r
+       temp.data = packet;\r
+       temp.channel = 0;\r
+       temp.heap_resident = false;\r
+\r
+       // pass the tuple to operator\r
+       list<host_tuple> res;\r
+       int ret;\r
+       fta_stat* tup_trace = NULL;\r
+       gs_uint32_t tup_trace_sz = 0;\r
+       gs_uint64_t trace_id = 0;\r
+       bool temp_tuple_received = false;\r
+\r
+\r
+       // if the tuple is temporal we need to extract the hearbeat payload\r
+       if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {\r
+               temp_tuple_received = true;\r
+               if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
+                       fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
+       }\r
+\r
+       if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {\r
+               /* perform a flush  */\r
+               ftap->flush();\r
+\r
+               /* post eof_tuple to a parent */\r
+               host_tuple eof_tuple;\r
+               ftap->oper->get_temp_status(eof_tuple);\r
+\r
+               /* last byte of the tuple specifies the tuple type\r
+                * set it to EOF_TUPLE\r
+               */\r
+               *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
+               hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
+\r
+               return 0;\r
+       }\r
+\r
+       ret = ftap->oper->accept_tuple(temp, res);\r
+\r
+       // go through the list of returned tuples and finalyze them\r
+       list<host_tuple>::iterator iter = res.begin();\r
+       while (iter != res.end()) {\r
+               host_tuple& tup = *iter;\r
+\r
+               // finalize the tuple\r
+               if (tup.tuple_size)\r
+                       finalize_tuple(tup);\r
+               iter++;\r
+       }\r
+\r
+       // if we received temporal tuple, last tuple of the result must be temporal too\r
+       // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
+       if (temp_tuple_received) {\r
+               fta_stat stats;\r
+               host_tuple& temp_tup = res.back();\r
+\r
+\r
+               int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
+               char* new_data = (char*)malloc(new_tuple_size);\r
+               memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
+               memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
+               memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
+               memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));\r
+\r
+               memset((char*)&stats, 0, sizeof(fta_stat));\r
+               memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
+\r
+               // Send a hearbeat message to clearinghouse.\r
+               fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
+\r
+               temp_tup.free_tuple();\r
+               temp_tup.data = new_data;\r
+               temp_tup.tuple_size = new_tuple_size;\r
+       }\r
+\r
+       // append returned list to output_queue\r
+       ftap->output_queue.splice(ftap->output_queue.end(), res);\r
+\r
+       // post tuples\r
+       while (!ftap->output_queue.empty()) {\r
+               host_tuple& tup = ftap->output_queue.front();\r
+               #ifdef DEBUG\r
+                       fprintf(stderr, "HFTA::about to post tuple\n");\r
+               #endif\r
+               if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
+                       tup.free_tuple();\r
+                       ftap->output_queue.pop_front();\r
+               } else\r
+                       break;\r
+       }\r
+\r
+       return 1;\r
+}\r
+\r
+gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {\r
+\r
+       // Send a hearbeat message to clearinghouse.to indicate we are alive\r
+       fta_heartbeat(ftap->ftaid, 0, 0, 0);\r
+\r
+       return 0;\r
+}\r
+\r
+\r
+struct MULTOP_HFTA {\r
+       struct FTA _fta;\r
+       gs_csp_t fta_name;\r
+       gs_schemahandle_t schema_handle;\r
+       operator_node* root;\r
+       vector<operator_node*> sorted_nodes;\r
+       int num_operators;\r
+       list<lfta_info*> *lfta_list;\r
+       /* number of eof tuples we received so far\r
+        * receiving eof tuples from every source fta will cause a flush\r
+       */\r
+       int num_eof_tuples;\r
+\r
+       bool failed;\r
+       bool reusable;\r
+\r
+       list<host_tuple> output_queue;\r
+\r
+       // Runtime stats\r
+       gs_uint32_t in_tuple_cnt;\r
+       gs_uint32_t out_tuple_cnt;\r
+       gs_uint32_t out_tuple_sz;\r
+       gs_uint64_t cycle_cnt;\r
+\r
+       gs_uint64_t trace_id;\r
+\r
+       // memory occupied by output queue\r
+       gs_uint32_t output_queue_mem;\r
+\r
+\r
+       // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
+       // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,\r
+       // as the schema handle is already passed during operator creation time.\r
+       // We also don't need to know the output schema as this information is already embeded\r
+       // in create_output_tuple method of operators' functor.\r
+\r
+\r
+\r
+       MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,\r
+               list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {\r
+\r
+               fta_name = name;\r
+               failed = false;\r
+\r
+               root = node;\r
+               lfta_list = lftas;\r
+\r
+               // assign streamid\r
+               _fta.ftaid = ftaid;\r
+               _fta.ftaid.streamid = (gs_p_t)this;\r
+\r
+               schema_handle = sch_handle;\r
+\r
+               output_queue_mem = 0;\r
+\r
+               // topologically sort the operators in the tree (or DAG)\r
+               // for DAG we make sure we add the node to the sorted list only once\r
+               operator_node* current_node;\r
+               map<operator_node*, int> node_map;\r
+               vector<operator_node*> node_list;\r
+\r
+               int i = 0;\r
+               node_list.push_back(root);\r
+               node_map[root] = 0;\r
+\r
+               num_operators = 1;\r
+\r
+               while (i < node_list.size()) {\r
+                       current_node = node_list[i];\r
+                       if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {\r
+                               node_map[current_node->left_child] = num_operators++;\r
+                               node_list.push_back(current_node->left_child);\r
+                       }\r
+                       if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {\r
+                               node_map[current_node->right_child] = num_operators++;\r
+                               node_list.push_back(current_node->right_child);\r
+                       }\r
+                       i++;\r
+               }\r
+               num_operators = i;\r
+\r
+               // build adjacency lists for query DAG\r
+               list<int>* adj_lists = new list<int>[num_operators];\r
+               bool* leaf_flags = new bool[num_operators];\r
+               memset(leaf_flags, 0, num_operators * sizeof(bool));\r
+               for (i = 0; i < num_operators; ++i) {\r
+                       current_node = node_list[i];\r
+                       if (current_node->left_child) {\r
+                               adj_lists[i].push_back(node_map[current_node->left_child]);\r
+                       }\r
+                       if (current_node->right_child && current_node->left_child != current_node->right_child) {\r
+                               adj_lists[i].push_back(node_map[current_node->right_child]);\r
+                       }\r
+               }\r
+\r
+               // run topolofical sort\r
+               bool leaf_found = true;\r
+               while (leaf_found) {\r
+                       leaf_found = false;\r
+                       // add all leafs to sorted_nodes\r
+                       for (i = 0; i < num_operators; ++i) {\r
+                               if (!leaf_flags[i] && adj_lists[i].empty()) {\r
+                                       leaf_flags[i] = true;\r
+                                       sorted_nodes.push_back(node_list[i]);\r
+                                       leaf_found = true;\r
+\r
+                                       // remove the node from its parents adjecency lists\r
+                                       for (int j = 0; j < num_operators; ++j) {\r
+                                               list<int>::iterator iter;\r
+                                               for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {\r
+                                                       if (*iter == i) {\r
+                                                               adj_lists[j].erase(iter);\r
+                                                               break;\r
+                                                       }\r
+                                               }\r
+                                       }\r
+                               }\r
+                       }\r
+               }\r
+\r
+               delete[] adj_lists;\r
+               delete[] leaf_flags;\r
+\r
+               // set the parameter block for every operator in tree\r
+               for (i = 0; i < num_operators; ++i)\r
+                       if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;\r
+\r
+#ifdef DEBUG\r
+               fprintf(stderr,"Instantiate FTAs\n");\r
+#endif\r
+               /* extract lfta param block from hfta param block */\r
+//                     NOTE: param_list must line up with lfta_list\r
+               list<param_block> param_list;\r
+               get_lfta_params(sz, value, param_list);\r
+               list<param_block>::iterator iter1;\r
+               list<lfta_info*>::iterator iter2 = lfta_list->begin();\r
+\r
+               for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {\r
+                       lfta_info* inf = *iter2;\r
+\r
+               #ifdef DEBUG\r
+                       fprintf(stderr,"Instantiate a FTA\n");\r
+               #endif\r
+\r
+                       gs_uint32_t reuse_flag = 2;\r
+\r
+                       // we will try to create a new instance of child FTA only if it is parameterized\r
+                       if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
+                               (*iter2)->f.streamid = 0;       // not interested in existing instances\r
+                               reuse_flag = 0;\r
+                       }\r
+                       if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {\r
+                               fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
+                               failed = true;\r
+                               return;\r
+                       }\r
+\r
+                       free((*iter1).data);\r
+\r
+                       //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");\r
+\r
+                       _fta.stream_subscribed[i]=(*iter2)->f;\r
+               }\r
+               _fta.stream_subscribed_cnt = i;\r
+\r
+               num_eof_tuples = 0;\r
+\r
+               _fta.alloc_fta = NULL;  // why should this be a part of the FTA (it is a factory function)\r
+               _fta.free_fta = MULTOP_HFTA_free_fta;\r
+               _fta.control_fta = MULTOP_HFTA_control_fta;\r
+               _fta.accept_packet = MULTOP_HFTA_accept_packet;\r
+               _fta.clock_fta = MULTOP_HFTA_clock_fta;\r
+\r
+               // init runtime stats\r
+               in_tuple_cnt = 0;\r
+               out_tuple_cnt = 0;\r
+               out_tuple_sz = 0;\r
+               cycle_cnt = 0;\r
+\r
+       }\r
+\r
+       ~MULTOP_HFTA() {\r
+\r
+               list<lfta_info*>::iterator iter;\r
+               int i = 0;\r
+\r
+               for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {\r
+               delete *iter;\r
+               }\r
+\r
+               delete root;    // free operators memory\r
+               delete lfta_list;\r
+\r
+\r
+     }\r
+\r
+       int flush() {\r
+\r
+               list<host_tuple> res;\r
+\r
+               // go through the list of operators in topological order\r
+               // and flush them\r
+               list<host_tuple>::iterator iter;\r
+               list<host_tuple> temp_output_queue;\r
+\r
+               for (int i = 0; i < num_operators; ++i) {\r
+                       operator_node* node = sorted_nodes[i];\r
+\r
+#ifdef PLAN_DAG\r
+                       list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;\r
+#else\r
+                       // for trees we can put output tuples directly into parent's input buffer\r
+                       list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;\r
+#endif\r
+                       // consume tuples waiting in your queue\r
+                       for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {\r
+                               node->op->accept_tuple(*(iter), current_output_queue);\r
+                       }\r
+                       node->op->flush(current_output_queue);\r
+                       node->input_queue.clear();\r
+\r
+#ifdef PLAN_DAG\r
+                       // copy the tuples from output queue into input queues of all parents\r
+                       list<operator_node*>::iterator node_iter;\r
+\r
+                       if (!node->parent_list.empty()) {\r
+                               // append the content of the output queue to parent input queue\r
+\r
+                               for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {\r
+                                       int* ref_cnt = 0;\r
+                                       if (node->parent_list.size() > 1) {\r
+                                               ref_cnt = (int*)malloc(sizeof(int));\r
+                                               *ref_cnt = node->parent_list.size() - 1;\r
+                                       }\r
+\r
+                                       for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {\r
+                                               (*iter).ref_cnt = ref_cnt;\r
+                                               (*node_iter)->input_queue.push_back(*iter);\r
+                                       }\r
+                               }\r
+                       }\r
+#endif\r
+               }\r
+\r
+               if (!res.empty()) {\r
+                       // go through the list of returned tuples and finalyze them\r
+                       list<host_tuple>::iterator iter = res.begin();\r
+                       while (iter != res.end()) {\r
+                               host_tuple& tup = *iter;\r
+\r
+                               // finalize the tuple\r
+                               if (tup.tuple_size)\r
+                                       finalize_tuple(tup);\r
+\r
+                               output_queue_mem += tup.tuple_size;\r
+                               iter++;\r
+                       }\r
+\r
+                       // append returned list to output_queue\r
+                       output_queue.splice(output_queue.end(), res);\r
+\r
+\r
+                       // post tuples\r
+                       while (!output_queue.empty()) {\r
+                               host_tuple& tup = output_queue.front();\r
+                               #ifdef DEBUG\r
+                                       fprintf(stderr, "HFTA::about to post tuple\n");\r
+                               #endif\r
+                               if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
+                                       output_queue_mem -= tup.tuple_size;\r
+                                       tup.free_tuple();\r
+                                       output_queue.pop_front();\r
+                               } else\r
+                                       break;\r
+                       }\r
+               }\r
+\r
+               return 0;\r
+       }\r
+\r
+       bool init_failed(){return failed;}\r
+};\r
+\r
+\r
+gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
+       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;  // deallocate the fta and call the destructor\r
+                                                               // will be called on program exit\r
+\r
+       if (recursive) {\r
+               // free instance we are subscribed to\r
+               list<lfta_info*>::iterator iter;\r
+               int i = 0;\r
+\r
+               for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {\r
+                       fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);\r
+               }\r
+       }\r
+\r
+       delete ftap;\r
+       return 0;\r
+}\r
+\r
+gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
+       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
+\r
+       if (command == FTA_COMMAND_FLUSH) {\r
+\r
+               // ask lftas to do the flush\r
+               list<lfta_info*>::iterator iter;\r
+               for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {\r
+                       fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);\r
+               }\r
+               // flush hfta operators\r
+               ftap->flush();\r
+\r
+       } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
+\r
+               list<param_block> param_list;\r
+               get_lfta_params(sz, value, param_list);\r
+\r
+               // ask lftas to do the flush and set new parameters\r
+               list<lfta_info*>::iterator iter;\r
+               list<param_block>::iterator iter2;\r
+               for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {\r
+                       fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);\r
+                       fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);\r
+                       free((*iter2).data);\r
+               }\r
+               // flush hfta operators\r
+               ftap->flush();\r
+\r
+               // set the new parameter block for every operator in tree\r
+               for (int i = 0; i < ftap->num_operators; ++i)\r
+                       ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);\r
+\r
+       } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
+               // we no longer use temp_status commands\r
+               // hearbeat mechanism is used instead\r
+       }\r
+       return 0;\r
+}\r
+\r
+gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
+       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
+\r
+       gs_uint64_t start_cycle = rdtsc();\r
+#ifdef DEBUG\r
+       fprintf(stderr, "HFTA::accepted packet\n");\r
+#endif\r
+       if (!length)     /* ignore null tuples */\r
+               return 0;\r
+\r
+       ftap->in_tuple_cnt++;\r
+\r
+       host_tuple temp;\r
+       temp.tuple_size = length;\r
+       temp.data = packet;\r
+       temp.channel = 0;\r
+       temp.heap_resident = false;\r
+\r
+// fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
+\r
+       // find from which lfta the tuple came\r
+       list<lfta_info*>::iterator iter;\r
+       lfta_info* inf = NULL;\r
+       int i;\r
+\r
+       for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {\r
+               if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip  &&\r
+                       ftap->_fta.stream_subscribed[i].port == ftaid->port &&\r
+                       ftap->_fta.stream_subscribed[i].index == ftaid->index &&\r
+                       ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {\r
+                       inf = *iter;\r
+                       break;\r
+               }\r
+       }\r
+\r
+       if (!inf) {\r
+               fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");\r
+               exit(1);\r
+       }\r
+\r
+       // route tuple through operator chain\r
+       list<host_tuple> result;\r
+       host_tuple tup;\r
+       int ret;\r
+       #ifndef PLAN_DAG\r
+               temp.channel = inf->output_channel;\r
+       #endif\r
+       operator_node* current_node = NULL, *child = NULL;\r
+       list<host_tuple> temp_output_queue;\r
+\r
+\r
+       fta_stat* tup_trace = NULL;\r
+       gs_uint32_t tup_trace_sz = 0;\r
+       gs_uint64_t trace_id = 0;\r
+       bool temp_tuple_received = false;\r
+\r
+       // if the tuple is temporal we need to extract the heartbeat payload\r
+       if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {\r
+               temp_tuple_received = true;\r
+               if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
+                       fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
+       }\r
+\r
+       if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {\r
+\r
+               if (++ftap->num_eof_tuples < ftap->lfta_list->size())\r
+                       return 0;\r
+\r
+               ftap->num_eof_tuples = 0;\r
+\r
+               /* perform a flush  */\r
+               ftap->flush();\r
+\r
+               /* post eof_tuple to a parent */\r
+               host_tuple eof_tuple;\r
+               ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);\r
+\r
+               /* last byte of the tuple specify the tuple type\r
+                * set it to EOF_TUPLE\r
+               */\r
+               *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
+               hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
+               ftap->out_tuple_cnt++;\r
+               ftap->out_tuple_sz+=eof_tuple.tuple_size;\r
+\r
+               return 0;\r
+       }\r
+\r
+       list<host_tuple>::iterator iter2;\r
+\r
+#ifdef PLAN_DAG\r
+\r
+       // push tuple to all parent operators of the lfta\r
+       list<operator_node*>::iterator node_iter;\r
+       list<unsigned>::iterator chan_iter;\r
+       for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {\r
+               temp.channel = *chan_iter;\r
+               (*node_iter)->input_queue.push_back(temp);\r
+       }\r
+\r
+       for (i = 0; i < ftap->num_operators; ++i) {\r
+\r
+               operator_node* node = ftap->sorted_nodes[i];\r
+               list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;\r
+\r
+               // consume tuples waiting in your queue\r
+               for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {\r
+                       node->op->accept_tuple(*(iter2), current_output_queue);\r
+               }\r
+               node->input_queue.clear();\r
+\r
+               // copy the tuples from output queue into input queues of all parents\r
+\r
+               if (!node->parent_list.empty()) {\r
+\r
+                       // append the content of the output queue to parent input queue\r
+                       for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {\r
+\r
+                               int* ref_cnt = 0;\r
+                               if (node->parent_list.size() > 1) {\r
+                                       ref_cnt = (int*)malloc(sizeof(int));\r
+                                       *ref_cnt = node->parent_list.size() - 1;\r
+                               }\r
+\r
+                               for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {\r
+                                       (*iter2).ref_cnt = ref_cnt;\r
+                                       (*iter2).channel = *chan_iter;\r
+                                       (*node_iter)->input_queue.push_back(*iter2);\r
+                               }\r
+                       }\r
+               }\r
+               temp_output_queue.clear();\r
+       }\r
+#else\r
+       current_node = inf->parent;\r
+\r
+// fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
+       current_node->input_queue.push_back(temp);\r
+\r
+       do {\r
+//fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);\r
+               list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;\r
+\r
+               // consume tuples waiting in your queue\r
+               for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {\r
+                       current_node->op->accept_tuple((*iter2),current_output_queue);\r
+               }\r
+//                     All consumed, delete them\r
+               current_node->input_queue.clear();\r
+               current_node = current_node->parent;\r
+\r
+       } while (current_node);\r
+#endif\r
+\r
+\r
+       host_tuple temp_tup;\r
+\r
+       bool no_temp_tuple = false;\r
+\r
+       // if we received temporal tuple, last tuple of the result must be temporal too\r
+       // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
+       if (temp_tuple_received) {\r
+\r
+               if (result.empty()) {\r
+                       no_temp_tuple = true;\r
+\r
+               } else {\r
+                       fta_stat stats;\r
+                       temp_tup = result.back();\r
+                       finalize_tuple(temp_tup);\r
+\r
+                       int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
+                       char* new_data = (char*)malloc(new_tuple_size);\r
+                       memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
+                       memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
+                       memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
+\r
+\r
+                       memset((char*)&stats, 0, sizeof(fta_stat));\r
+                       stats.ftaid = fta->ftaid;\r
+                       stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
+                       stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
+                       stats.out_tuple_sz = ftap->out_tuple_sz;\r
+                       stats.cycle_cnt = ftap->cycle_cnt;\r
+                       memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
+\r
+                       // Send a hearbeat message to clearinghouse.\r
+                       fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
+\r
+                       // reset the stats\r
+                       ftap->in_tuple_cnt = 0;\r
+                       ftap->out_tuple_cnt = 0;\r
+                       ftap->out_tuple_sz = 0;\r
+                       ftap->cycle_cnt = 0;\r
+\r
+                       free(temp_tup.data);\r
+                       temp_tup.data = new_data;\r
+                       temp_tup.tuple_size = new_tuple_size;\r
+                       result.pop_back();\r
+               }\r
+       }\r
+\r
+       // go through the list of returned tuples and finalyze them\r
+       // since we can produce multiple temporal tuples in DAG plans\r
+       // we can drop all of them except the last one\r
+       iter2 = result.begin();\r
+       while(iter2 != result.end()) {\r
+               host_tuple tup = *iter2;\r
+\r
+               // finalize the tuple\r
+               if (tup.tuple_size) {\r
+                       finalize_tuple(tup);\r
+\r
+       #ifdef PLAN_DAG\r
+               if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))\r
+                       tup.free_tuple();\r
+               else\r
+       #endif\r
+                       {\r
+                       ftap->output_queue.push_back(tup);\r
+                       ftap->output_queue_mem += tup.tuple_size;\r
+                       }\r
+\r
+               }\r
+               iter2++;\r
+       }\r
+\r
+       // append returned list to output_queue\r
+       // ftap->output_queue.splice(ftap->output_queue.end(), result);\r
+\r
+       if (temp_tuple_received && !no_temp_tuple) {\r
+               ftap->output_queue.push_back(temp_tup);\r
+               ftap->output_queue_mem += temp_tup.tuple_size;\r
+       }\r
+\r
+       // post tuples\r
+       while (!ftap->output_queue.empty()) {\r
+               host_tuple& tup = ftap->output_queue.front();\r
+               #ifdef DEBUG\r
+                       fprintf(stderr, "HFTA::about to post tuple\n");\r
+               #endif\r
+               if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
+                       ftap->out_tuple_cnt++;\r
+                       ftap->out_tuple_sz+=tup.tuple_size;\r
+                       ftap->output_queue_mem -= tup.tuple_size;\r
+                       tup.free_tuple();\r
+                       ftap->output_queue.pop_front();\r
+               } else\r
+                       break;\r
+       }\r
+\r
+       ftap->cycle_cnt += rdtsc() - start_cycle;\r
+\r
+       return 1;\r
+}\r
+\r
+gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {\r
+       MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
+\r
+#ifdef HFTA_PROFILE\r
+        /* Print stats */\r
+        fprintf(stderr, "FTA = %s|", ftap->fta_name);\r
+        fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);\r
+        fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);\r
+        fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);\r
+        fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);\r
+\r
+\r
+               fprintf(stderr, "mem_footprint  %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());\r
+               unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();\r
+\r
+               for (int i = 1; i < ftap->num_operators; ++i) {\r
+                       operator_node* node = ftap->sorted_nodes[i];\r
+                       fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());\r
+                       total_mem += node->op->get_mem_footprint();\r
+               }\r
+               fprintf(stderr, ", total = %d|", total_mem );\r
+               fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );\r
+#endif\r
+\r
+       fta_stat stats;\r
+       memset((char*)&stats, 0, sizeof(fta_stat));\r
+       stats.ftaid = fta->ftaid;\r
+       stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
+       stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
+       stats.out_tuple_sz = ftap->out_tuple_sz;\r
+       stats.cycle_cnt = ftap->cycle_cnt;\r
+\r
+       // Send a hearbeat message to clearinghouse.\r
+       fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);\r
+\r
+       // resets runtime stats\r
+       ftap->in_tuple_cnt = 0;\r
+       ftap->out_tuple_cnt = 0;\r
+       ftap->out_tuple_sz = 0;\r
+       ftap->cycle_cnt = 0;\r
+\r
+       return 0;\r
+}\r
+\r
+\r
+#endif // __HFTA_H\r
+\r