-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
-\r
- http://www.apache.org/licenses/LICENSE-2.0\r
-\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#ifndef __HFTA_H\r
-#define __HFTA_H\r
-\r
-#include "gstypes.h"\r
-#include "host_tuple.h"\r
-#include "base_operator.h"\r
-#include <vector>\r
-#include <map>\r
-#include<math.h>\r
-#include "rdtsc.h"\r
-using namespace std;\r
-\r
-#define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))\r
-#define LLMIN(x,y) ((x)<(y)?(x):(y))\r
-#define LLMAX(x,y) ((x)<(y)?(y):(x))\r
-#define UMIN(x,y) ((x)<(y)?(x):(y))\r
-#define UMAX(x,y) ((x)<(y)?(y):(x))\r
-#define EQ(x,y) ((x)==(y))\r
-#define GEQ(x,y) ((x)>=(y))\r
-#define LEQ(x,y) ((x)<=(y))\r
-// Cast away temporality\r
-#define non_temporal(x)(x)\r
-// Access math libraries\r
-#define sqrt(x) sqrt(x)\r
-\r
-\r
-extern "C" {\r
-#include <lapp.h>\r
-#include <fta.h>\r
-#include <stdlib.h>\r
-#include <stdio.h>\r
-#include <schemaparser.h>\r
-}\r
-\r
-struct param_block {\r
- gs_int32_t block_length;\r
- void* data;\r
-};\r
-\r
-// forward declaration of operator_node\r
-struct operator_node;\r
-\r
-struct lfta_info {\r
- gs_schemahandle_t schema_handle;\r
- gs_sp_t schema;\r
- gs_sp_t fta_name;\r
-#ifdef PLAN_DAG\r
- list<operator_node*> parent_list;\r
- list<unsigned> out_channel_list;\r
-#else\r
- operator_node* parent;\r
- unsigned output_channel;\r
-#endif\r
- FTAID f;\r
-\r
- lfta_info() {\r
- schema_handle = -1;\r
- schema = NULL;\r
- #ifndef PLAN_DAG\r
- parent = NULL;\r
- output_channel = 0;\r
- #endif\r
- }\r
-\r
- ~lfta_info() {\r
- if (fta_name)\r
- free (fta_name);\r
- if (schema)\r
- free (schema);\r
- if (schema_handle >= 0)\r
- ftaschema_free(schema_handle);\r
- }\r
-};\r
-\r
-\r
-struct operator_node {\r
- base_operator* op;\r
-\r
-#ifdef PLAN_DAG\r
- list<operator_node*> parent_list;\r
- list<unsigned> out_channel_list;\r
-#else\r
- operator_node* parent;\r
-#endif\r
-\r
- operator_node* left_child;\r
- operator_node* right_child;\r
- lfta_info* left_lfta;\r
- lfta_info* right_lfta;\r
-\r
- list<host_tuple> input_queue;\r
-\r
- operator_node(base_operator* o) {\r
- op = o;\r
- #ifndef PLAN_DAG\r
- parent = NULL;\r
- #endif\r
- left_child = right_child = NULL;\r
- left_lfta = right_lfta = NULL;\r
- }\r
-\r
- ~operator_node() {\r
- delete op;\r
- }\r
-\r
- void set_left_child_node(operator_node* child) {\r
- left_child = child;\r
- if (child) {\r
- #ifdef PLAN_DAG\r
- child->parent_list.push_back(this);\r
- child->out_channel_list.push_back(0);\r
- #else\r
- child->parent = this;\r
- child->op->set_output_channel(0);\r
- #endif\r
- }\r
- }\r
-\r
- void set_right_child_node(operator_node* child) {\r
- right_child = child;\r
- if (child) {\r
- #ifdef PLAN_DAG\r
- child->parent_list.push_back(this);\r
- child->out_channel_list.push_back(1);\r
- #else\r
- child->parent = this;\r
- child->op->set_output_channel(1);\r
- #endif\r
- }\r
- }\r
-\r
- void set_left_lfta(lfta_info* l_lfta) {\r
- left_lfta = l_lfta;\r
- if (left_lfta) {\r
- #ifdef PLAN_DAG\r
- left_lfta->parent_list.push_back(this);\r
- left_lfta->out_channel_list.push_back(0);\r
- #else\r
- left_lfta->parent = this;\r
- left_lfta->output_channel = 0;\r
- #endif\r
- }\r
- }\r
-\r
- void set_right_lfta(lfta_info* r_lfta) {\r
- right_lfta = r_lfta;\r
- if (right_lfta) {\r
- #ifdef PLAN_DAG\r
- right_lfta->parent_list.push_back(this);\r
- right_lfta->out_channel_list.push_back(1);\r
- #else\r
- right_lfta->parent = this;\r
- right_lfta->output_channel = 1;\r
- #endif\r
- }\r
- }\r
-\r
-};\r
-\r
-\r
-\r
-int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);\r
-void finalize_tuple(host_tuple &tup);\r
-void finalize_tuple(host_tuple &tup);\r
-gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
-gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
-gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
-gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);\r
-\r
-gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
-gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
-gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
-gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);\r
-\r
-struct UNOP_HFTA {\r
- struct FTA _fta;\r
- base_operator* oper;\r
- FTAID f;\r
- bool failed;\r
- gs_schemahandle_t schema_handle;\r
-\r
- list<host_tuple> output_queue;\r
-\r
- // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
- // lfta_name and an instance of the base_operator\r
- // We don't need to know the output schema as this information is already embeded\r
- // in create_output_tuple method of operators' functor.\r
- UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,\r
- gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {\r
-\r
- failed = false;\r
- oper = op;\r
- f = child_ftaid;\r
- schema_handle = sch_handle;\r
-\r
- // assign streamid\r
- _fta.ftaid = ftaid;\r
- _fta.ftaid.streamid = (gs_p_t)this;\r
-\r
-#ifdef DEBUG\r
- fprintf(stderr,"Instantiate a FTA\n");\r
-#endif\r
- /* extract lfta param block from hfta param block */\r
- list<param_block> param_list;\r
- get_lfta_params(sz, value, param_list);\r
- param_block param = param_list.front();\r
-\r
-\r
- gs_uint32_t reuse_flag = 2;\r
- // we will try to create a new instance of child FTA only if it is parameterized\r
- if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
- f.streamid = 0; // not interested in existing instances\r
- reuse_flag = 0;\r
- }\r
-\r
- if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {\r
- fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
- failed = true;\r
- return;\r
- }\r
-\r
- free(param.data);\r
- // set the operator's parameters\r
- if(oper->set_param_block(sz, (void*)value)) failed = true;;\r
-\r
-\r
- fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);\r
- _fta.stream_subscribed_cnt = 1;\r
- _fta.stream_subscribed[0] = f;\r
-\r
- _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)\r
- _fta.free_fta = UNOP_HFTA_free_fta;\r
- _fta.control_fta = UNOP_HFTA_control_fta;\r
- _fta.accept_packet = UNOP_HFTA_accept_packet;\r
- _fta.clock_fta = UNOP_HFTA_clock_fta;\r
- }\r
-\r
- ~UNOP_HFTA() {\r
- delete oper; // free operators memory\r
-\r
- }\r
-\r
- int flush() {\r
- list<host_tuple> res;\r
- if (!oper->flush(res)) {\r
-\r
- if (!res.empty()) {\r
- // go through the list of returned tuples and finalyze them\r
- list<host_tuple>::iterator iter = res.begin();\r
- while (iter != res.end()) {\r
- host_tuple& tup = *iter;\r
-\r
- // finalize the tuple\r
- if (tup.tuple_size)\r
- finalize_tuple(tup);\r
- iter++;\r
- }\r
-\r
- // append returned list to output_queue\r
- output_queue.splice(output_queue.end(), res);\r
-\r
-\r
- // post tuples\r
- while (!output_queue.empty()) {\r
- host_tuple& tup = output_queue.front();\r
- #ifdef DEBUG\r
- fprintf(stderr, "HFTA::about to post tuple\n");\r
- #endif\r
- if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
- tup.free_tuple();\r
- output_queue.pop_front();\r
- } else\r
- break;\r
- }\r
- }\r
- }\r
-\r
- return 0;\r
- }\r
-\r
- bool init_failed(){return failed;}\r
-};\r
-\r
-gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
- UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor\r
- // will be called on program exit\r
-\r
- if (recursive)\r
- // free instance we are subscribed to\r
- fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);\r
-\r
- delete ftap;\r
- return 0;\r
-}\r
-\r
-gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
- UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
-\r
- if (command == FTA_COMMAND_FLUSH) {\r
- // ask lfta to do the flush\r
- fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
- ftap->flush();\r
-\r
- } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
- // ask lfta to do the flush\r
- fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
- ftap->flush();\r
-\r
- /* extract lfta param block from hfta param block */\r
- list<param_block> param_list;\r
- get_lfta_params(sz, value, param_list);\r
- param_block param = param_list.front();\r
- // load new parameters into lfta\r
- fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);\r
- free(param.data);\r
-\r
- // notify the operator about the change of parameter\r
- ftap->oper->set_param_block(sz, (void*)value);\r
-\r
- } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
- // we no longer use temp_status commands\r
- // hearbeat mechanism is used instead\r
- }\r
- return 0;\r
-}\r
-\r
-gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
- UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
-#ifdef DEBUG\r
- fprintf(stderr, "HFTA::accepted packet\n");\r
-#endif\r
- if (!length) /* ignore null tuples */\r
- return 0;\r
-\r
- host_tuple temp;\r
- temp.tuple_size = length;\r
- temp.data = packet;\r
- temp.channel = 0;\r
- temp.heap_resident = false;\r
-\r
- // pass the tuple to operator\r
- list<host_tuple> res;\r
- int ret;\r
- fta_stat* tup_trace = NULL;\r
- gs_uint32_t tup_trace_sz = 0;\r
- gs_uint64_t trace_id = 0;\r
- bool temp_tuple_received = false;\r
-\r
-\r
- // if the tuple is temporal we need to extract the hearbeat payload\r
- if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {\r
- temp_tuple_received = true;\r
- if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
- fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
- }\r
-\r
- if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {\r
- /* perform a flush */\r
- ftap->flush();\r
-\r
- /* post eof_tuple to a parent */\r
- host_tuple eof_tuple;\r
- ftap->oper->get_temp_status(eof_tuple);\r
-\r
- /* last byte of the tuple specifies the tuple type\r
- * set it to EOF_TUPLE\r
- */\r
- *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
- hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
-\r
- return 0;\r
- }\r
-\r
- ret = ftap->oper->accept_tuple(temp, res);\r
-\r
- // go through the list of returned tuples and finalyze them\r
- list<host_tuple>::iterator iter = res.begin();\r
- while (iter != res.end()) {\r
- host_tuple& tup = *iter;\r
-\r
- // finalize the tuple\r
- if (tup.tuple_size)\r
- finalize_tuple(tup);\r
- iter++;\r
- }\r
-\r
- // if we received temporal tuple, last tuple of the result must be temporal too\r
- // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
- if (temp_tuple_received) {\r
- fta_stat stats;\r
- host_tuple& temp_tup = res.back();\r
-\r
-\r
- int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
- char* new_data = (char*)malloc(new_tuple_size);\r
- memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
- memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));\r
-\r
- memset((char*)&stats, 0, sizeof(fta_stat));\r
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
-\r
- // Send a hearbeat message to clearinghouse.\r
- fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
-\r
- temp_tup.free_tuple();\r
- temp_tup.data = new_data;\r
- temp_tup.tuple_size = new_tuple_size;\r
- }\r
-\r
- // append returned list to output_queue\r
- ftap->output_queue.splice(ftap->output_queue.end(), res);\r
-\r
- // post tuples\r
- while (!ftap->output_queue.empty()) {\r
- host_tuple& tup = ftap->output_queue.front();\r
- #ifdef DEBUG\r
- fprintf(stderr, "HFTA::about to post tuple\n");\r
- #endif\r
- if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
- tup.free_tuple();\r
- ftap->output_queue.pop_front();\r
- } else\r
- break;\r
- }\r
-\r
- return 1;\r
-}\r
-\r
-gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {\r
-\r
- // Send a hearbeat message to clearinghouse.to indicate we are alive\r
- fta_heartbeat(ftap->ftaid, 0, 0, 0);\r
-\r
- return 0;\r
-}\r
-\r
-\r
-struct MULTOP_HFTA {\r
- struct FTA _fta;\r
- gs_csp_t fta_name;\r
- gs_schemahandle_t schema_handle;\r
- operator_node* root;\r
- vector<operator_node*> sorted_nodes;\r
- int num_operators;\r
- list<lfta_info*> *lfta_list;\r
- /* number of eof tuples we received so far\r
- * receiving eof tuples from every source fta will cause a flush\r
- */\r
- int num_eof_tuples;\r
-\r
- bool failed;\r
- bool reusable;\r
-\r
- list<host_tuple> output_queue;\r
-\r
- // Runtime stats\r
- gs_uint32_t in_tuple_cnt;\r
- gs_uint32_t out_tuple_cnt;\r
- gs_uint32_t out_tuple_sz;\r
- gs_uint64_t cycle_cnt;\r
-\r
- gs_uint64_t trace_id;\r
-\r
- // memory occupied by output queue\r
- gs_uint32_t output_queue_mem;\r
-\r
-\r
- // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
- // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,\r
- // as the schema handle is already passed during operator creation time.\r
- // We also don't need to know the output schema as this information is already embeded\r
- // in create_output_tuple method of operators' functor.\r
-\r
-\r
-\r
- MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,\r
- list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {\r
-\r
- fta_name = name;\r
- failed = false;\r
-\r
- root = node;\r
- lfta_list = lftas;\r
-\r
- // assign streamid\r
- _fta.ftaid = ftaid;\r
- _fta.ftaid.streamid = (gs_p_t)this;\r
-\r
- schema_handle = sch_handle;\r
-\r
- output_queue_mem = 0;\r
-\r
- // topologically sort the operators in the tree (or DAG)\r
- // for DAG we make sure we add the node to the sorted list only once\r
- operator_node* current_node;\r
- map<operator_node*, int> node_map;\r
- vector<operator_node*> node_list;\r
-\r
- int i = 0;\r
- node_list.push_back(root);\r
- node_map[root] = 0;\r
-\r
- num_operators = 1;\r
-\r
- while (i < node_list.size()) {\r
- current_node = node_list[i];\r
- if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {\r
- node_map[current_node->left_child] = num_operators++;\r
- node_list.push_back(current_node->left_child);\r
- }\r
- if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {\r
- node_map[current_node->right_child] = num_operators++;\r
- node_list.push_back(current_node->right_child);\r
- }\r
- i++;\r
- }\r
- num_operators = i;\r
-\r
- // build adjacency lists for query DAG\r
- list<int>* adj_lists = new list<int>[num_operators];\r
- bool* leaf_flags = new bool[num_operators];\r
- memset(leaf_flags, 0, num_operators * sizeof(bool));\r
- for (i = 0; i < num_operators; ++i) {\r
- current_node = node_list[i];\r
- if (current_node->left_child) {\r
- adj_lists[i].push_back(node_map[current_node->left_child]);\r
- }\r
- if (current_node->right_child && current_node->left_child != current_node->right_child) {\r
- adj_lists[i].push_back(node_map[current_node->right_child]);\r
- }\r
- }\r
-\r
- // run topolofical sort\r
- bool leaf_found = true;\r
- while (leaf_found) {\r
- leaf_found = false;\r
- // add all leafs to sorted_nodes\r
- for (i = 0; i < num_operators; ++i) {\r
- if (!leaf_flags[i] && adj_lists[i].empty()) {\r
- leaf_flags[i] = true;\r
- sorted_nodes.push_back(node_list[i]);\r
- leaf_found = true;\r
-\r
- // remove the node from its parents adjecency lists\r
- for (int j = 0; j < num_operators; ++j) {\r
- list<int>::iterator iter;\r
- for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {\r
- if (*iter == i) {\r
- adj_lists[j].erase(iter);\r
- break;\r
- }\r
- }\r
- }\r
- }\r
- }\r
- }\r
-\r
- delete[] adj_lists;\r
- delete[] leaf_flags;\r
-\r
- // set the parameter block for every operator in tree\r
- for (i = 0; i < num_operators; ++i)\r
- if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;\r
-\r
-#ifdef DEBUG\r
- fprintf(stderr,"Instantiate FTAs\n");\r
-#endif\r
- /* extract lfta param block from hfta param block */\r
-// NOTE: param_list must line up with lfta_list\r
- list<param_block> param_list;\r
- get_lfta_params(sz, value, param_list);\r
- list<param_block>::iterator iter1;\r
- list<lfta_info*>::iterator iter2 = lfta_list->begin();\r
-\r
- for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {\r
- lfta_info* inf = *iter2;\r
-\r
- #ifdef DEBUG\r
- fprintf(stderr,"Instantiate a FTA\n");\r
- #endif\r
-\r
- gs_uint32_t reuse_flag = 2;\r
-\r
- // we will try to create a new instance of child FTA only if it is parameterized\r
- if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
- (*iter2)->f.streamid = 0; // not interested in existing instances\r
- reuse_flag = 0;\r
- }\r
- if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {\r
- fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
- failed = true;\r
- return;\r
- }\r
-\r
- free((*iter1).data);\r
-\r
- //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");\r
-\r
- _fta.stream_subscribed[i]=(*iter2)->f;\r
- }\r
- _fta.stream_subscribed_cnt = i;\r
-\r
- num_eof_tuples = 0;\r
-\r
- _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)\r
- _fta.free_fta = MULTOP_HFTA_free_fta;\r
- _fta.control_fta = MULTOP_HFTA_control_fta;\r
- _fta.accept_packet = MULTOP_HFTA_accept_packet;\r
- _fta.clock_fta = MULTOP_HFTA_clock_fta;\r
-\r
- // init runtime stats\r
- in_tuple_cnt = 0;\r
- out_tuple_cnt = 0;\r
- out_tuple_sz = 0;\r
- cycle_cnt = 0;\r
-\r
- }\r
-\r
- ~MULTOP_HFTA() {\r
-\r
- list<lfta_info*>::iterator iter;\r
- int i = 0;\r
-\r
- for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {\r
- delete *iter;\r
- }\r
-\r
- delete root; // free operators memory\r
- delete lfta_list;\r
-\r
-\r
- }\r
-\r
- int flush() {\r
-\r
- list<host_tuple> res;\r
-\r
- // go through the list of operators in topological order\r
- // and flush them\r
- list<host_tuple>::iterator iter;\r
- list<host_tuple> temp_output_queue;\r
-\r
- for (int i = 0; i < num_operators; ++i) {\r
- operator_node* node = sorted_nodes[i];\r
-\r
-#ifdef PLAN_DAG\r
- list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;\r
-#else\r
- // for trees we can put output tuples directly into parent's input buffer\r
- list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;\r
-#endif\r
- // consume tuples waiting in your queue\r
- for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {\r
- node->op->accept_tuple(*(iter), current_output_queue);\r
- }\r
- node->op->flush(current_output_queue);\r
- node->input_queue.clear();\r
-\r
-#ifdef PLAN_DAG\r
- // copy the tuples from output queue into input queues of all parents\r
- list<operator_node*>::iterator node_iter;\r
-\r
- if (!node->parent_list.empty()) {\r
- // append the content of the output queue to parent input queue\r
-\r
- for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {\r
- int* ref_cnt = 0;\r
- if (node->parent_list.size() > 1) {\r
- ref_cnt = (int*)malloc(sizeof(int));\r
- *ref_cnt = node->parent_list.size() - 1;\r
- }\r
-\r
- for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {\r
- (*iter).ref_cnt = ref_cnt;\r
- (*node_iter)->input_queue.push_back(*iter);\r
- }\r
- }\r
- }\r
-#endif\r
- }\r
-\r
- if (!res.empty()) {\r
- // go through the list of returned tuples and finalyze them\r
- list<host_tuple>::iterator iter = res.begin();\r
- while (iter != res.end()) {\r
- host_tuple& tup = *iter;\r
-\r
- // finalize the tuple\r
- if (tup.tuple_size)\r
- finalize_tuple(tup);\r
-\r
- output_queue_mem += tup.tuple_size;\r
- iter++;\r
- }\r
-\r
- // append returned list to output_queue\r
- output_queue.splice(output_queue.end(), res);\r
-\r
-\r
- // post tuples\r
- while (!output_queue.empty()) {\r
- host_tuple& tup = output_queue.front();\r
- #ifdef DEBUG\r
- fprintf(stderr, "HFTA::about to post tuple\n");\r
- #endif\r
- if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
- output_queue_mem -= tup.tuple_size;\r
- tup.free_tuple();\r
- output_queue.pop_front();\r
- } else\r
- break;\r
- }\r
- }\r
-\r
- return 0;\r
- }\r
-\r
- bool init_failed(){return failed;}\r
-};\r
-\r
-\r
-gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor\r
- // will be called on program exit\r
-\r
- if (recursive) {\r
- // free instance we are subscribed to\r
- list<lfta_info*>::iterator iter;\r
- int i = 0;\r
-\r
- for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {\r
- fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);\r
- }\r
- }\r
-\r
- delete ftap;\r
- return 0;\r
-}\r
-\r
-gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
-\r
- if (command == FTA_COMMAND_FLUSH) {\r
-\r
- // ask lftas to do the flush\r
- list<lfta_info*>::iterator iter;\r
- for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {\r
- fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);\r
- }\r
- // flush hfta operators\r
- ftap->flush();\r
-\r
- } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
-\r
- list<param_block> param_list;\r
- get_lfta_params(sz, value, param_list);\r
-\r
- // ask lftas to do the flush and set new parameters\r
- list<lfta_info*>::iterator iter;\r
- list<param_block>::iterator iter2;\r
- for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {\r
- fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);\r
- fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);\r
- free((*iter2).data);\r
- }\r
- // flush hfta operators\r
- ftap->flush();\r
-\r
- // set the new parameter block for every operator in tree\r
- for (int i = 0; i < ftap->num_operators; ++i)\r
- ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);\r
-\r
- } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
- // we no longer use temp_status commands\r
- // hearbeat mechanism is used instead\r
- }\r
- return 0;\r
-}\r
-\r
-gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
-\r
- gs_uint64_t start_cycle = rdtsc();\r
-#ifdef DEBUG\r
- fprintf(stderr, "HFTA::accepted packet\n");\r
-#endif\r
- if (!length) /* ignore null tuples */\r
- return 0;\r
-\r
- ftap->in_tuple_cnt++;\r
-\r
- host_tuple temp;\r
- temp.tuple_size = length;\r
- temp.data = packet;\r
- temp.channel = 0;\r
- temp.heap_resident = false;\r
-\r
-// fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
-\r
- // find from which lfta the tuple came\r
- list<lfta_info*>::iterator iter;\r
- lfta_info* inf = NULL;\r
- int i;\r
-\r
- for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {\r
- if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&\r
- ftap->_fta.stream_subscribed[i].port == ftaid->port &&\r
- ftap->_fta.stream_subscribed[i].index == ftaid->index &&\r
- ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {\r
- inf = *iter;\r
- break;\r
- }\r
- }\r
-\r
- if (!inf) {\r
- fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");\r
- exit(1);\r
- }\r
-\r
- // route tuple through operator chain\r
- list<host_tuple> result;\r
- host_tuple tup;\r
- int ret;\r
- #ifndef PLAN_DAG\r
- temp.channel = inf->output_channel;\r
- #endif\r
- operator_node* current_node = NULL, *child = NULL;\r
- list<host_tuple> temp_output_queue;\r
-\r
-\r
- fta_stat* tup_trace = NULL;\r
- gs_uint32_t tup_trace_sz = 0;\r
- gs_uint64_t trace_id = 0;\r
- bool temp_tuple_received = false;\r
-\r
- // if the tuple is temporal we need to extract the heartbeat payload\r
- if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {\r
- temp_tuple_received = true;\r
- if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
- fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
- }\r
-\r
- if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {\r
-\r
- if (++ftap->num_eof_tuples < ftap->lfta_list->size())\r
- return 0;\r
-\r
- ftap->num_eof_tuples = 0;\r
-\r
- /* perform a flush */\r
- ftap->flush();\r
-\r
- /* post eof_tuple to a parent */\r
- host_tuple eof_tuple;\r
- ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);\r
-\r
- /* last byte of the tuple specify the tuple type\r
- * set it to EOF_TUPLE\r
- */\r
- *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
- hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
- ftap->out_tuple_cnt++;\r
- ftap->out_tuple_sz+=eof_tuple.tuple_size;\r
-\r
- return 0;\r
- }\r
-\r
- list<host_tuple>::iterator iter2;\r
-\r
-#ifdef PLAN_DAG\r
-\r
- // push tuple to all parent operators of the lfta\r
- list<operator_node*>::iterator node_iter;\r
- list<unsigned>::iterator chan_iter;\r
- for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {\r
- temp.channel = *chan_iter;\r
- (*node_iter)->input_queue.push_back(temp);\r
- }\r
-\r
- for (i = 0; i < ftap->num_operators; ++i) {\r
-\r
- operator_node* node = ftap->sorted_nodes[i];\r
- list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;\r
-\r
- // consume tuples waiting in your queue\r
- for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {\r
- node->op->accept_tuple(*(iter2), current_output_queue);\r
- }\r
- node->input_queue.clear();\r
-\r
- // copy the tuples from output queue into input queues of all parents\r
-\r
- if (!node->parent_list.empty()) {\r
-\r
- // append the content of the output queue to parent input queue\r
- for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {\r
-\r
- int* ref_cnt = 0;\r
- if (node->parent_list.size() > 1) {\r
- ref_cnt = (int*)malloc(sizeof(int));\r
- *ref_cnt = node->parent_list.size() - 1;\r
- }\r
-\r
- for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {\r
- (*iter2).ref_cnt = ref_cnt;\r
- (*iter2).channel = *chan_iter;\r
- (*node_iter)->input_queue.push_back(*iter2);\r
- }\r
- }\r
- }\r
- temp_output_queue.clear();\r
- }\r
-#else\r
- current_node = inf->parent;\r
-\r
-// fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
- current_node->input_queue.push_back(temp);\r
-\r
- do {\r
-//fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);\r
- list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;\r
-\r
- // consume tuples waiting in your queue\r
- for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {\r
- current_node->op->accept_tuple((*iter2),current_output_queue);\r
- }\r
-// All consumed, delete them\r
- current_node->input_queue.clear();\r
- current_node = current_node->parent;\r
-\r
- } while (current_node);\r
-#endif\r
-\r
-\r
- host_tuple temp_tup;\r
-\r
- bool no_temp_tuple = false;\r
-\r
- // if we received temporal tuple, last tuple of the result must be temporal too\r
- // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
- if (temp_tuple_received) {\r
-\r
- if (result.empty()) {\r
- no_temp_tuple = true;\r
-\r
- } else {\r
- fta_stat stats;\r
- temp_tup = result.back();\r
- finalize_tuple(temp_tup);\r
-\r
- int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
- char* new_data = (char*)malloc(new_tuple_size);\r
- memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
- memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
-\r
-\r
- memset((char*)&stats, 0, sizeof(fta_stat));\r
- stats.ftaid = fta->ftaid;\r
- stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
- stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
- stats.out_tuple_sz = ftap->out_tuple_sz;\r
- stats.cycle_cnt = ftap->cycle_cnt;\r
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
-\r
- // Send a hearbeat message to clearinghouse.\r
- fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
-\r
- // reset the stats\r
- ftap->in_tuple_cnt = 0;\r
- ftap->out_tuple_cnt = 0;\r
- ftap->out_tuple_sz = 0;\r
- ftap->cycle_cnt = 0;\r
-\r
- free(temp_tup.data);\r
- temp_tup.data = new_data;\r
- temp_tup.tuple_size = new_tuple_size;\r
- result.pop_back();\r
- }\r
- }\r
-\r
- // go through the list of returned tuples and finalyze them\r
- // since we can produce multiple temporal tuples in DAG plans\r
- // we can drop all of them except the last one\r
- iter2 = result.begin();\r
- while(iter2 != result.end()) {\r
- host_tuple tup = *iter2;\r
-\r
- // finalize the tuple\r
- if (tup.tuple_size) {\r
- finalize_tuple(tup);\r
-\r
- #ifdef PLAN_DAG\r
- if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))\r
- tup.free_tuple();\r
- else\r
- #endif\r
- {\r
- ftap->output_queue.push_back(tup);\r
- ftap->output_queue_mem += tup.tuple_size;\r
- }\r
-\r
- }\r
- iter2++;\r
- }\r
-\r
- // append returned list to output_queue\r
- // ftap->output_queue.splice(ftap->output_queue.end(), result);\r
-\r
- if (temp_tuple_received && !no_temp_tuple) {\r
- ftap->output_queue.push_back(temp_tup);\r
- ftap->output_queue_mem += temp_tup.tuple_size;\r
- }\r
-\r
- // post tuples\r
- while (!ftap->output_queue.empty()) {\r
- host_tuple& tup = ftap->output_queue.front();\r
- #ifdef DEBUG\r
- fprintf(stderr, "HFTA::about to post tuple\n");\r
- #endif\r
- if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
- ftap->out_tuple_cnt++;\r
- ftap->out_tuple_sz+=tup.tuple_size;\r
- ftap->output_queue_mem -= tup.tuple_size;\r
- tup.free_tuple();\r
- ftap->output_queue.pop_front();\r
- } else\r
- break;\r
- }\r
-\r
- ftap->cycle_cnt += rdtsc() - start_cycle;\r
-\r
- return 1;\r
-}\r
-\r
-gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {\r
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
-\r
-#ifdef HFTA_PROFILE\r
- /* Print stats */\r
- fprintf(stderr, "FTA = %s|", ftap->fta_name);\r
- fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);\r
- fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);\r
- fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);\r
- fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);\r
-\r
-\r
- fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());\r
- unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();\r
-\r
- for (int i = 1; i < ftap->num_operators; ++i) {\r
- operator_node* node = ftap->sorted_nodes[i];\r
- fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());\r
- total_mem += node->op->get_mem_footprint();\r
- }\r
- fprintf(stderr, ", total = %d|", total_mem );\r
- fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );\r
-#endif\r
-\r
- fta_stat stats;\r
- memset((char*)&stats, 0, sizeof(fta_stat));\r
- stats.ftaid = fta->ftaid;\r
- stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
- stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
- stats.out_tuple_sz = ftap->out_tuple_sz;\r
- stats.cycle_cnt = ftap->cycle_cnt;\r
-\r
- // Send a hearbeat message to clearinghouse.\r
- fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);\r
-\r
- // resets runtime stats\r
- ftap->in_tuple_cnt = 0;\r
- ftap->out_tuple_cnt = 0;\r
- ftap->out_tuple_sz = 0;\r
- ftap->cycle_cnt = 0;\r
-\r
- return 0;\r
-}\r
-\r
-\r
-#endif // __HFTA_H\r
-\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#ifndef __HFTA_H
+#define __HFTA_H
+
+#include "gstypes.h"
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <vector>
+#include <map>
+#include<math.h>
+#include "rdtsc.h"
+using namespace std;
+
+#define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
+
+// min, max
+#define ULLMIN(x,y) (unsigned long long)(((x)<(y)?(x):(y)))
+#define ULLMAX(x,y) (unsigned long long)(((x)<(y)?(y):(x)))
+#define LLMIN(x,y) (long long int)(((x)<(y)?(x):(y)))
+#define LLMAX(x,y) (long long int)(((x)<(y)?(y):(x)))
+#define UMIN(x,y) (unsigned int)(((x)<(y)?(x):(y)))
+#define UMAX(x,y) (unsigned int)(((x)<(y)?(y):(x)))
+#define LMIN(x,y) (int)(((x)<(y)?(x):(y)))
+#define LMAX(x,y) (int)(((x)<(y)?(y):(x)))
+#define FMIN(x,y) (double)(((x)<(y)?(x):(y)))
+#define FMAX(x,y) (double)(((x)<(y)?(y):(x)))
+
+// comparison
+#define EQ(x,y) ((x)==(y))
+#define GEQ(x,y) ((x)>=(y))
+#define GE(x,y) ((x)>(y))
+#define LEQ(x,y) ((x)<=(y))
+#define LE(x,y) ((x)<(y))
+
+// if_else
+#define if_else_f(x,y,z) (double)(((x)==0?(z):(y)))
+#define if_else_ll(x,y,z) (long long int)(((x)==0?(z):(y)))
+#define if_else_ul(x,y,z) (unsigned long long)(((x)==0?(z):(y)))
+#define if_else_u(x,y,z) (unsigned int)(((x)==0?(z):(y)))
+#define if_else_i(x,y,z) (int)(((x)==0?(z):(y)))
+
+// Cast away temporality
+#define non_temporal(x)(x)
+
+// endian swap
+#define endian_swap_ui(x) ( (( (x) & 0xFF000000) >> 24) | (( (x) & 0x00FF0000) >> 8) | (( (x) & 0x0000FF00) << 8) | (( (x) & 0x000000FF) << 24) )
+
+// Access math libraries
+#define sqrt(x) sqrt(x)
+#define pow(x,y) pow((x),(y))
+#define sin(x) sin(x)
+#define cos(x) cos(x)
+#define tan(x) tan(x)
+#define asin(x) asin(x)
+#define acos(x) acos(x)
+#define atan(x) atan(x)
+#define log(x) log(x)
+#define log2(x) log2(x)
+#define log10(x) log10(x)
+#define ceil(x) ceil(x)
+#define floor(x) floor(x)
+#define fmod(x) fmod(x)
+#define trunc(x) trunc(x)
+
+
+extern "C" {
+#include <lapp.h>
+#include <fta.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <schemaparser.h>
+}
+
+struct param_block {
+ gs_int32_t block_length;
+ void* data;
+};
+
+// forward declaration of operator_node
+struct operator_node;
+
+struct lfta_info {
+ gs_schemahandle_t schema_handle;
+ gs_sp_t schema;
+ gs_sp_t fta_name;
+#ifdef PLAN_DAG
+ list<operator_node*> parent_list;
+ list<unsigned> out_channel_list;
+#else
+ operator_node* parent;
+ unsigned output_channel;
+#endif
+ FTAID f;
+
+ lfta_info() {
+ schema_handle = -1;
+ schema = NULL;
+ #ifndef PLAN_DAG
+ parent = NULL;
+ output_channel = 0;
+ #endif
+ }
+
+ ~lfta_info() {
+ if (fta_name)
+ free (fta_name);
+ if (schema)
+ free (schema);
+ if (schema_handle >= 0)
+ ftaschema_free(schema_handle);
+ }
+};
+
+
+struct operator_node {
+ base_operator* op;
+
+#ifdef PLAN_DAG
+ list<operator_node*> parent_list;
+ list<unsigned> out_channel_list;
+#else
+ operator_node* parent;
+#endif
+
+ operator_node* left_child;
+ operator_node* right_child;
+ lfta_info* left_lfta;
+ lfta_info* right_lfta;
+
+ list<host_tuple> input_queue;
+
+ operator_node(base_operator* o) {
+ op = o;
+ #ifndef PLAN_DAG
+ parent = NULL;
+ #endif
+ left_child = right_child = NULL;
+ left_lfta = right_lfta = NULL;
+ }
+
+ ~operator_node() {
+ delete op;
+ }
+
+ void set_left_child_node(operator_node* child) {
+ left_child = child;
+ if (child) {
+ #ifdef PLAN_DAG
+ child->parent_list.push_back(this);
+ child->out_channel_list.push_back(0);
+ #else
+ child->parent = this;
+ child->op->set_output_channel(0);
+ #endif
+ }
+ }
+
+ void set_right_child_node(operator_node* child) {
+ right_child = child;
+ if (child) {
+ #ifdef PLAN_DAG
+ child->parent_list.push_back(this);
+ child->out_channel_list.push_back(1);
+ #else
+ child->parent = this;
+ child->op->set_output_channel(1);
+ #endif
+ }
+ }
+
+ void set_left_lfta(lfta_info* l_lfta) {
+ left_lfta = l_lfta;
+ if (left_lfta) {
+ #ifdef PLAN_DAG
+ left_lfta->parent_list.push_back(this);
+ left_lfta->out_channel_list.push_back(0);
+ #else
+ left_lfta->parent = this;
+ left_lfta->output_channel = 0;
+ #endif
+ }
+ }
+
+ void set_right_lfta(lfta_info* r_lfta) {
+ right_lfta = r_lfta;
+ if (right_lfta) {
+ #ifdef PLAN_DAG
+ right_lfta->parent_list.push_back(this);
+ right_lfta->out_channel_list.push_back(1);
+ #else
+ right_lfta->parent = this;
+ right_lfta->output_channel = 1;
+ #endif
+ }
+ }
+
+};
+
+
+
+int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
+void finalize_tuple(host_tuple &tup);
+void finalize_tuple(host_tuple &tup);
+gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
+gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
+gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
+gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
+
+gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
+gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
+gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
+gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
+
+struct UNOP_HFTA {
+ struct FTA _fta;
+ base_operator* oper;
+ FTAID f;
+ bool failed;
+ gs_schemahandle_t schema_handle;
+
+ list<host_tuple> output_queue;
+
+ // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
+ // lfta_name and an instance of the base_operator
+ // We don't need to know the output schema as this information is already embeded
+ // in create_output_tuple method of operators' functor.
+ UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
+ gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
+
+ failed = false;
+ oper = op;
+ f = child_ftaid;
+ schema_handle = sch_handle;
+
+ // assign streamid
+ _fta.ftaid = ftaid;
+ _fta.ftaid.streamid = (gs_p_t)this;
+
+#ifdef DEBUG
+ fprintf(stderr,"Instantiate a FTA\n");
+#endif
+ /* extract lfta param block from hfta param block */
+ list<param_block> param_list;
+ get_lfta_params(sz, value, param_list);
+ param_block param = param_list.front();
+
+
+ gs_uint32_t reuse_flag = 2;
+ // we will try to create a new instance of child FTA only if it is parameterized
+ if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
+ f.streamid = 0; // not interested in existing instances
+ reuse_flag = 0;
+ }
+
+ if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
+ fprintf(stderr,"HFTA::error:could instantiate a FTA");
+ failed = true;
+ return;
+ }
+
+ free(param.data);
+ // set the operator's parameters
+ if(oper->set_param_block(sz, (void*)value)) failed = true;;
+
+
+ fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
+ _fta.stream_subscribed_cnt = 1;
+ _fta.stream_subscribed[0] = f;
+
+ _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
+ _fta.free_fta = UNOP_HFTA_free_fta;
+ _fta.control_fta = UNOP_HFTA_control_fta;
+ _fta.accept_packet = UNOP_HFTA_accept_packet;
+ _fta.clock_fta = UNOP_HFTA_clock_fta;
+ }
+
+ ~UNOP_HFTA() {
+ delete oper; // free operators memory
+
+ }
+
+ int flush() {
+ list<host_tuple> res;
+ if (!oper->flush(res)) {
+
+ if (!res.empty()) {
+ // go through the list of returned tuples and finalyze them
+ list<host_tuple>::iterator iter = res.begin();
+ while (iter != res.end()) {
+ host_tuple& tup = *iter;
+
+ // finalize the tuple
+ if (tup.tuple_size)
+ finalize_tuple(tup);
+ iter++;
+ }
+
+ // append returned list to output_queue
+ output_queue.splice(output_queue.end(), res);
+
+
+ // post tuples
+ while (!output_queue.empty()) {
+ host_tuple& tup = output_queue.front();
+ #ifdef DEBUG
+ fprintf(stderr, "HFTA::about to post tuple\n");
+ #endif
+ if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
+ tup.free_tuple();
+ output_queue.pop_front();
+ } else
+ break;
+ }
+ }
+ }
+
+ return 0;
+ }
+
+ bool init_failed(){return failed;}
+};
+
+gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
+ UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor
+ // will be called on program exit
+
+ if (recursive)
+ // free instance we are subscribed to
+ fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
+
+ delete ftap;
+ return 0;
+}
+
+gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
+ UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
+
+ if (command == FTA_COMMAND_FLUSH) {
+ // ask lfta to do the flush
+ fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
+ ftap->flush();
+
+ } else if (command == FTA_COMMAND_LOAD_PARAMS) {
+ // ask lfta to do the flush
+ fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
+ ftap->flush();
+
+ /* extract lfta param block from hfta param block */
+ list<param_block> param_list;
+ get_lfta_params(sz, value, param_list);
+ param_block param = param_list.front();
+ // load new parameters into lfta
+ fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
+ free(param.data);
+
+ // notify the operator about the change of parameter
+ ftap->oper->set_param_block(sz, (void*)value);
+
+ } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
+ // we no longer use temp_status commands
+ // hearbeat mechanism is used instead
+ }
+ return 0;
+}
+
+gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
+ UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
+#ifdef DEBUG
+ fprintf(stderr, "HFTA::accepted packet\n");
+#endif
+ if (!length) /* ignore null tuples */
+ return 0;
+
+ host_tuple temp;
+ temp.tuple_size = length;
+ temp.data = packet;
+ temp.channel = 0;
+ temp.heap_resident = false;
+
+ // pass the tuple to operator
+ list<host_tuple> res;
+ int ret;
+ fta_stat* tup_trace = NULL;
+ gs_uint32_t tup_trace_sz = 0;
+ gs_uint64_t trace_id = 0;
+ bool temp_tuple_received = false;
+
+
+ // if the tuple is temporal we need to extract the hearbeat payload
+ if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
+ temp_tuple_received = true;
+ if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
+ fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
+ }
+
+ if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
+ /* perform a flush */
+ ftap->flush();
+
+ /* post eof_tuple to a parent */
+ host_tuple eof_tuple;
+ ftap->oper->get_temp_status(eof_tuple);
+
+ /* last byte of the tuple specifies the tuple type
+ * set it to EOF_TUPLE
+ */
+ *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
+ hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
+
+ return 0;
+ }
+
+ ret = ftap->oper->accept_tuple(temp, res);
+
+ // go through the list of returned tuples and finalyze them
+ list<host_tuple>::iterator iter = res.begin();
+ while (iter != res.end()) {
+ host_tuple& tup = *iter;
+
+ // finalize the tuple
+ if (tup.tuple_size)
+ finalize_tuple(tup);
+ iter++;
+ }
+
+ // if we received temporal tuple, last tuple of the result must be temporal too
+ // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
+ if (temp_tuple_received) {
+ fta_stat stats;
+ host_tuple& temp_tup = res.back();
+
+
+ int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
+ char* new_data = (char*)malloc(new_tuple_size);
+ memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
+ memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
+
+ memset((char*)&stats, 0, sizeof(fta_stat));
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
+
+ // Send a hearbeat message to clearinghouse.
+ fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
+
+ temp_tup.free_tuple();
+ temp_tup.data = new_data;
+ temp_tup.tuple_size = new_tuple_size;
+ }
+
+ // append returned list to output_queue
+ ftap->output_queue.splice(ftap->output_queue.end(), res);
+
+ // post tuples
+ while (!ftap->output_queue.empty()) {
+ host_tuple& tup = ftap->output_queue.front();
+ #ifdef DEBUG
+ fprintf(stderr, "HFTA::about to post tuple\n");
+ #endif
+ if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
+ tup.free_tuple();
+ ftap->output_queue.pop_front();
+ } else
+ break;
+ }
+
+ return 1;
+}
+
+gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
+
+ // Send a hearbeat message to clearinghouse.to indicate we are alive
+ fta_heartbeat(ftap->ftaid, 0, 0, 0);
+
+ return 0;
+}
+
+
+struct MULTOP_HFTA {
+ struct FTA _fta;
+ gs_csp_t fta_name;
+ gs_schemahandle_t schema_handle;
+ operator_node* root;
+ vector<operator_node*> sorted_nodes;
+ int num_operators;
+ list<lfta_info*> *lfta_list;
+ /* number of eof tuples we received so far
+ * receiving eof tuples from every source fta will cause a flush
+ */
+ int num_eof_tuples;
+
+ bool failed;
+ bool reusable;
+
+ list<host_tuple> output_queue;
+
+ // Runtime stats
+ gs_uint32_t in_tuple_cnt;
+ gs_uint32_t out_tuple_cnt;
+ gs_uint32_t out_tuple_sz;
+ gs_uint64_t cycle_cnt;
+
+ gs_uint64_t trace_id;
+
+ // memory occupied by output queue
+ gs_uint32_t output_queue_mem;
+
+
+ // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
+ // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
+ // as the schema handle is already passed during operator creation time.
+ // We also don't need to know the output schema as this information is already embeded
+ // in create_output_tuple method of operators' functor.
+
+
+
+ MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
+ list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
+
+ fta_name = name;
+ failed = false;
+
+ root = node;
+ lfta_list = lftas;
+
+ // assign streamid
+ _fta.ftaid = ftaid;
+ _fta.ftaid.streamid = (gs_p_t)this;
+
+ schema_handle = sch_handle;
+
+ output_queue_mem = 0;
+
+ // topologically sort the operators in the tree (or DAG)
+ // for DAG we make sure we add the node to the sorted list only once
+ operator_node* current_node;
+ map<operator_node*, int> node_map;
+ vector<operator_node*> node_list;
+
+ int i = 0;
+ node_list.push_back(root);
+ node_map[root] = 0;
+
+ num_operators = 1;
+
+ while (i < node_list.size()) {
+ current_node = node_list[i];
+ if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
+ node_map[current_node->left_child] = num_operators++;
+ node_list.push_back(current_node->left_child);
+ }
+ if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
+ node_map[current_node->right_child] = num_operators++;
+ node_list.push_back(current_node->right_child);
+ }
+ i++;
+ }
+ num_operators = i;
+
+ // build adjacency lists for query DAG
+ list<int>* adj_lists = new list<int>[num_operators];
+ bool* leaf_flags = new bool[num_operators];
+ memset(leaf_flags, 0, num_operators * sizeof(bool));
+ for (i = 0; i < num_operators; ++i) {
+ current_node = node_list[i];
+ if (current_node->left_child) {
+ adj_lists[i].push_back(node_map[current_node->left_child]);
+ }
+ if (current_node->right_child && current_node->left_child != current_node->right_child) {
+ adj_lists[i].push_back(node_map[current_node->right_child]);
+ }
+ }
+
+ // run topolofical sort
+ bool leaf_found = true;
+ while (leaf_found) {
+ leaf_found = false;
+ // add all leafs to sorted_nodes
+ for (i = 0; i < num_operators; ++i) {
+ if (!leaf_flags[i] && adj_lists[i].empty()) {
+ leaf_flags[i] = true;
+ sorted_nodes.push_back(node_list[i]);
+ leaf_found = true;
+
+ // remove the node from its parents adjecency lists
+ for (int j = 0; j < num_operators; ++j) {
+ list<int>::iterator iter;
+ for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
+ if (*iter == i) {
+ adj_lists[j].erase(iter);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ delete[] adj_lists;
+ delete[] leaf_flags;
+
+ // set the parameter block for every operator in tree
+ for (i = 0; i < num_operators; ++i)
+ if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
+
+#ifdef DEBUG
+ fprintf(stderr,"Instantiate FTAs\n");
+#endif
+ /* extract lfta param block from hfta param block */
+// NOTE: param_list must line up with lfta_list
+ list<param_block> param_list;
+ get_lfta_params(sz, value, param_list);
+ list<param_block>::iterator iter1;
+ list<lfta_info*>::iterator iter2 = lfta_list->begin();
+
+ for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
+ lfta_info* inf = *iter2;
+
+ #ifdef DEBUG
+ fprintf(stderr,"Instantiate a FTA\n");
+ #endif
+
+ gs_uint32_t reuse_flag = 2;
+
+ // we will try to create a new instance of child FTA only if it is parameterized
+ if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
+ (*iter2)->f.streamid = 0; // not interested in existing instances
+ reuse_flag = 0;
+ }
+ if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
+ fprintf(stderr,"HFTA::error:could instantiate a FTA");
+ failed = true;
+ return;
+ }
+
+ free((*iter1).data);
+
+ //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
+
+ _fta.stream_subscribed[i]=(*iter2)->f;
+ }
+ _fta.stream_subscribed_cnt = i;
+
+ num_eof_tuples = 0;
+
+ _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
+ _fta.free_fta = MULTOP_HFTA_free_fta;
+ _fta.control_fta = MULTOP_HFTA_control_fta;
+ _fta.accept_packet = MULTOP_HFTA_accept_packet;
+ _fta.clock_fta = MULTOP_HFTA_clock_fta;
+
+ // init runtime stats
+ in_tuple_cnt = 0;
+ out_tuple_cnt = 0;
+ out_tuple_sz = 0;
+ cycle_cnt = 0;
+
+ }
+
+ ~MULTOP_HFTA() {
+
+ list<lfta_info*>::iterator iter;
+ int i = 0;
+
+ for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
+ delete *iter;
+ }
+
+ delete root; // free operators memory
+ delete lfta_list;
+
+
+ }
+
+ int flush() {
+
+ list<host_tuple> res;
+
+ // go through the list of operators in topological order
+ // and flush them
+ list<host_tuple>::iterator iter;
+ list<host_tuple> temp_output_queue;
+
+ for (int i = 0; i < num_operators; ++i) {
+ operator_node* node = sorted_nodes[i];
+
+#ifdef PLAN_DAG
+ list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
+#else
+ // for trees we can put output tuples directly into parent's input buffer
+ list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
+#endif
+ // consume tuples waiting in your queue
+ for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
+ node->op->accept_tuple(*(iter), current_output_queue);
+ }
+ node->op->flush(current_output_queue);
+ node->input_queue.clear();
+
+#ifdef PLAN_DAG
+ // copy the tuples from output queue into input queues of all parents
+ list<operator_node*>::iterator node_iter;
+
+ if (!node->parent_list.empty()) {
+ // append the content of the output queue to parent input queue
+
+ for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
+ int* ref_cnt = 0;
+ if (node->parent_list.size() > 1) {
+ ref_cnt = (int*)malloc(sizeof(int));
+ *ref_cnt = node->parent_list.size() - 1;
+ }
+
+ for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
+ (*iter).ref_cnt = ref_cnt;
+ (*node_iter)->input_queue.push_back(*iter);
+ }
+ }
+ }
+#endif
+ }
+
+ if (!res.empty()) {
+ // go through the list of returned tuples and finalyze them
+ list<host_tuple>::iterator iter = res.begin();
+ while (iter != res.end()) {
+ host_tuple& tup = *iter;
+
+ // finalize the tuple
+ if (tup.tuple_size)
+ finalize_tuple(tup);
+
+ output_queue_mem += tup.tuple_size;
+ iter++;
+ }
+
+ // append returned list to output_queue
+ output_queue.splice(output_queue.end(), res);
+
+
+ // post tuples
+ while (!output_queue.empty()) {
+ host_tuple& tup = output_queue.front();
+ #ifdef DEBUG
+ fprintf(stderr, "HFTA::about to post tuple\n");
+ #endif
+ if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
+ output_queue_mem -= tup.tuple_size;
+ tup.free_tuple();
+ output_queue.pop_front();
+ } else
+ break;
+ }
+ }
+
+ return 0;
+ }
+
+ bool init_failed(){return failed;}
+};
+
+
+gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor
+ // will be called on program exit
+
+ if (recursive) {
+ // free instance we are subscribed to
+ list<lfta_info*>::iterator iter;
+ int i = 0;
+
+ for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
+ fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
+ }
+ }
+
+ delete ftap;
+ return 0;
+}
+
+gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
+
+ if (command == FTA_COMMAND_FLUSH) {
+
+ // ask lftas to do the flush
+ list<lfta_info*>::iterator iter;
+ for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
+ fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
+ }
+ // flush hfta operators
+ ftap->flush();
+
+ } else if (command == FTA_COMMAND_LOAD_PARAMS) {
+
+ list<param_block> param_list;
+ get_lfta_params(sz, value, param_list);
+
+ // ask lftas to do the flush and set new parameters
+ list<lfta_info*>::iterator iter;
+ list<param_block>::iterator iter2;
+ for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
+ fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
+ fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
+ free((*iter2).data);
+ }
+ // flush hfta operators
+ ftap->flush();
+
+ // set the new parameter block for every operator in tree
+ for (int i = 0; i < ftap->num_operators; ++i)
+ ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
+
+ } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
+ // we no longer use temp_status commands
+ // hearbeat mechanism is used instead
+ }
+ return 0;
+}
+
+gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
+
+ gs_uint64_t start_cycle = rdtsc();
+#ifdef DEBUG
+ fprintf(stderr, "HFTA::accepted packet\n");
+#endif
+ if (!length) /* ignore null tuples */
+ return 0;
+
+ ftap->in_tuple_cnt++;
+
+ host_tuple temp;
+ temp.tuple_size = length;
+ temp.data = packet;
+ temp.channel = 0;
+ temp.heap_resident = false;
+
+// fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
+
+ // find from which lfta the tuple came
+ list<lfta_info*>::iterator iter;
+ lfta_info* inf = NULL;
+ int i;
+
+ for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
+ if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&
+ ftap->_fta.stream_subscribed[i].port == ftaid->port &&
+ ftap->_fta.stream_subscribed[i].index == ftaid->index &&
+ ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
+ inf = *iter;
+ break;
+ }
+ }
+
+ if (!inf) {
+ fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
+ exit(1);
+ }
+
+ // route tuple through operator chain
+ list<host_tuple> result;
+ host_tuple tup;
+ int ret;
+ #ifndef PLAN_DAG
+ temp.channel = inf->output_channel;
+ #endif
+ operator_node* current_node = NULL, *child = NULL;
+ list<host_tuple> temp_output_queue;
+
+
+ fta_stat* tup_trace = NULL;
+ gs_uint32_t tup_trace_sz = 0;
+ gs_uint64_t trace_id = 0;
+ bool temp_tuple_received = false;
+
+ // if the tuple is temporal we need to extract the heartbeat payload
+ if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
+ temp_tuple_received = true;
+ if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
+ fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
+ }
+
+ if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
+
+ if (++ftap->num_eof_tuples < ftap->lfta_list->size())
+ return 0;
+
+ ftap->num_eof_tuples = 0;
+
+ /* perform a flush */
+ ftap->flush();
+
+ /* post eof_tuple to a parent */
+ host_tuple eof_tuple;
+ ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
+
+ /* last byte of the tuple specify the tuple type
+ * set it to EOF_TUPLE
+ */
+ *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
+ hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
+ ftap->out_tuple_cnt++;
+ ftap->out_tuple_sz+=eof_tuple.tuple_size;
+
+ return 0;
+ }
+
+ list<host_tuple>::iterator iter2;
+
+#ifdef PLAN_DAG
+
+ // push tuple to all parent operators of the lfta
+ list<operator_node*>::iterator node_iter;
+ list<unsigned>::iterator chan_iter;
+ for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
+ temp.channel = *chan_iter;
+ (*node_iter)->input_queue.push_back(temp);
+ }
+
+ for (i = 0; i < ftap->num_operators; ++i) {
+
+ operator_node* node = ftap->sorted_nodes[i];
+ list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
+
+ // consume tuples waiting in your queue
+ for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
+ node->op->accept_tuple(*(iter2), current_output_queue);
+ }
+ node->input_queue.clear();
+
+ // copy the tuples from output queue into input queues of all parents
+
+ if (!node->parent_list.empty()) {
+
+ // append the content of the output queue to parent input queue
+ for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
+
+ int* ref_cnt = 0;
+ if (node->parent_list.size() > 1) {
+ ref_cnt = (int*)malloc(sizeof(int));
+ *ref_cnt = node->parent_list.size() - 1;
+ }
+
+ for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
+ (*iter2).ref_cnt = ref_cnt;
+ (*iter2).channel = *chan_iter;
+ (*node_iter)->input_queue.push_back(*iter2);
+ }
+ }
+ }
+ temp_output_queue.clear();
+ }
+#else
+ current_node = inf->parent;
+
+// fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
+ current_node->input_queue.push_back(temp);
+
+ do {
+//fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
+ list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
+
+ // consume tuples waiting in your queue
+ for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
+ current_node->op->accept_tuple((*iter2),current_output_queue);
+ }
+// All consumed, delete them
+ current_node->input_queue.clear();
+ current_node = current_node->parent;
+
+ } while (current_node);
+#endif
+
+
+ host_tuple temp_tup;
+
+ bool no_temp_tuple = false;
+
+ // if we received temporal tuple, last tuple of the result must be temporal too
+ // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
+ if (temp_tuple_received) {
+
+ if (result.empty()) {
+ no_temp_tuple = true;
+
+ } else {
+ fta_stat stats;
+ temp_tup = result.back();
+ finalize_tuple(temp_tup);
+
+ int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
+ char* new_data = (char*)malloc(new_tuple_size);
+ memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
+ memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
+
+
+ memset((char*)&stats, 0, sizeof(fta_stat));
+ stats.ftaid = fta->ftaid;
+ stats.in_tuple_cnt = ftap->in_tuple_cnt;
+ stats.out_tuple_cnt = ftap->out_tuple_cnt;
+ stats.out_tuple_sz = ftap->out_tuple_sz;
+ stats.cycle_cnt = ftap->cycle_cnt;
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
+
+ // Send a hearbeat message to clearinghouse.
+ fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
+
+ // reset the stats
+ ftap->in_tuple_cnt = 0;
+ ftap->out_tuple_cnt = 0;
+ ftap->out_tuple_sz = 0;
+ ftap->cycle_cnt = 0;
+
+ free(temp_tup.data);
+ temp_tup.data = new_data;
+ temp_tup.tuple_size = new_tuple_size;
+ result.pop_back();
+ }
+ }
+
+ // go through the list of returned tuples and finalyze them
+ // since we can produce multiple temporal tuples in DAG plans
+ // we can drop all of them except the last one
+ iter2 = result.begin();
+ while(iter2 != result.end()) {
+ host_tuple tup = *iter2;
+
+ // finalize the tuple
+ if (tup.tuple_size) {
+ finalize_tuple(tup);
+
+ #ifdef PLAN_DAG
+ if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
+ tup.free_tuple();
+ else
+ #endif
+ {
+ ftap->output_queue.push_back(tup);
+ ftap->output_queue_mem += tup.tuple_size;
+ }
+
+ }
+ iter2++;
+ }
+
+ // append returned list to output_queue
+ // ftap->output_queue.splice(ftap->output_queue.end(), result);
+
+ if (temp_tuple_received && !no_temp_tuple) {
+ ftap->output_queue.push_back(temp_tup);
+ ftap->output_queue_mem += temp_tup.tuple_size;
+ }
+
+ // post tuples
+ while (!ftap->output_queue.empty()) {
+ host_tuple& tup = ftap->output_queue.front();
+ #ifdef DEBUG
+ fprintf(stderr, "HFTA::about to post tuple\n");
+ #endif
+ if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
+ ftap->out_tuple_cnt++;
+ ftap->out_tuple_sz+=tup.tuple_size;
+ ftap->output_queue_mem -= tup.tuple_size;
+ tup.free_tuple();
+ ftap->output_queue.pop_front();
+ } else
+ break;
+ }
+
+ ftap->cycle_cnt += rdtsc() - start_cycle;
+
+ return 1;
+}
+
+gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
+
+#ifdef HFTA_PROFILE
+ /* Print stats */
+ fprintf(stderr, "FTA = %s|", ftap->fta_name);
+ fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
+ fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
+ fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
+ fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
+
+
+ fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
+ unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
+
+ for (int i = 1; i < ftap->num_operators; ++i) {
+ operator_node* node = ftap->sorted_nodes[i];
+ fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
+ total_mem += node->op->get_mem_footprint();
+ }
+ fprintf(stderr, ", total = %d|", total_mem );
+ fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
+#endif
+
+ fta_stat stats;
+ memset((char*)&stats, 0, sizeof(fta_stat));
+ stats.ftaid = fta->ftaid;
+ stats.in_tuple_cnt = ftap->in_tuple_cnt;
+ stats.out_tuple_cnt = ftap->out_tuple_cnt;
+ stats.out_tuple_sz = ftap->out_tuple_sz;
+ stats.cycle_cnt = ftap->cycle_cnt;
+
+ // Send a hearbeat message to clearinghouse.
+ fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
+
+ // resets runtime stats
+ ftap->in_tuple_cnt = 0;
+ ftap->out_tuple_cnt = 0;
+ ftap->out_tuple_sz = 0;
+ ftap->cycle_cnt = 0;
+
+ return 0;
+}
+
+
+#endif // __HFTA_H
+