-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#ifndef __HFTA_H
-#define __HFTA_H
-
-#include "gstypes.h"
-#include "host_tuple.h"
-#include "base_operator.h"
-#include <vector>
-#include <map>
-#include<math.h>
-#include "rdtsc.h"
-using namespace std;
-
-#define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
-#define LLMIN(x,y) ((x)<(y)?(x):(y))
-#define LLMAX(x,y) ((x)<(y)?(y):(x))
-#define UMIN(x,y) ((x)<(y)?(x):(y))
-#define UMAX(x,y) ((x)<(y)?(y):(x))
-#define EQ(x,y) ((x)==(y))
-#define GEQ(x,y) ((x)>=(y))
-#define LEQ(x,y) ((x)<=(y))
-// Cast away temporality
-#define non_temporal(x)(x)
-// Access math libraries
-#define sqrt(x) sqrt(x)
-
-
-extern "C" {
-#include <lapp.h>
-#include <fta.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <schemaparser.h>
-}
-
-struct param_block {
- gs_int32_t block_length;
- void* data;
-};
-
-// forward declaration of operator_node
-struct operator_node;
-
-struct lfta_info {
- gs_schemahandle_t schema_handle;
- gs_sp_t schema;
- gs_sp_t fta_name;
-#ifdef PLAN_DAG
- list<operator_node*> parent_list;
- list<unsigned> out_channel_list;
-#else
- operator_node* parent;
- unsigned output_channel;
-#endif
- FTAID f;
-
- lfta_info() {
- schema_handle = -1;
- schema = NULL;
- #ifndef PLAN_DAG
- parent = NULL;
- output_channel = 0;
- #endif
- }
-
- ~lfta_info() {
- if (fta_name)
- free (fta_name);
- if (schema)
- free (schema);
- if (schema_handle >= 0)
- ftaschema_free(schema_handle);
- }
-};
-
-
-struct operator_node {
- base_operator* op;
-
-#ifdef PLAN_DAG
- list<operator_node*> parent_list;
- list<unsigned> out_channel_list;
-#else
- operator_node* parent;
-#endif
-
- operator_node* left_child;
- operator_node* right_child;
- lfta_info* left_lfta;
- lfta_info* right_lfta;
-
- list<host_tuple> input_queue;
-
- operator_node(base_operator* o) {
- op = o;
- #ifndef PLAN_DAG
- parent = NULL;
- #endif
- left_child = right_child = NULL;
- left_lfta = right_lfta = NULL;
- }
-
- ~operator_node() {
- delete op;
- }
-
- void set_left_child_node(operator_node* child) {
- left_child = child;
- if (child) {
- #ifdef PLAN_DAG
- child->parent_list.push_back(this);
- child->out_channel_list.push_back(0);
- #else
- child->parent = this;
- child->op->set_output_channel(0);
- #endif
- }
- }
-
- void set_right_child_node(operator_node* child) {
- right_child = child;
- if (child) {
- #ifdef PLAN_DAG
- child->parent_list.push_back(this);
- child->out_channel_list.push_back(1);
- #else
- child->parent = this;
- child->op->set_output_channel(1);
- #endif
- }
- }
-
- void set_left_lfta(lfta_info* l_lfta) {
- left_lfta = l_lfta;
- if (left_lfta) {
- #ifdef PLAN_DAG
- left_lfta->parent_list.push_back(this);
- left_lfta->out_channel_list.push_back(0);
- #else
- left_lfta->parent = this;
- left_lfta->output_channel = 0;
- #endif
- }
- }
-
- void set_right_lfta(lfta_info* r_lfta) {
- right_lfta = r_lfta;
- if (right_lfta) {
- #ifdef PLAN_DAG
- right_lfta->parent_list.push_back(this);
- right_lfta->out_channel_list.push_back(1);
- #else
- right_lfta->parent = this;
- right_lfta->output_channel = 1;
- #endif
- }
- }
-
-};
-
-
-
-int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
-void finalize_tuple(host_tuple &tup);
-void finalize_tuple(host_tuple &tup);
-gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
-gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
-gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
-gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
-
-gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
-gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
-gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
-gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
-
-struct UNOP_HFTA {
- struct FTA _fta;
- base_operator* oper;
- FTAID f;
- bool failed;
- gs_schemahandle_t schema_handle;
-
- list<host_tuple> output_queue;
-
- // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
- // lfta_name and an instance of the base_operator
- // We don't need to know the output schema as this information is already embeded
- // in create_output_tuple method of operators' functor.
- UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
- gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
-
- failed = false;
- oper = op;
- f = child_ftaid;
- schema_handle = sch_handle;
-
- // assign streamid
- _fta.ftaid = ftaid;
- _fta.ftaid.streamid = (gs_p_t)this;
-
-#ifdef DEBUG
- fprintf(stderr,"Instantiate a FTA\n");
-#endif
- /* extract lfta param block from hfta param block */
- list<param_block> param_list;
- get_lfta_params(sz, value, param_list);
- param_block param = param_list.front();
-
-
- gs_uint32_t reuse_flag = 2;
- // we will try to create a new instance of child FTA only if it is parameterized
- if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
- f.streamid = 0; // not interested in existing instances
- reuse_flag = 0;
- }
-
- if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
- fprintf(stderr,"HFTA::error:could instantiate a FTA");
- failed = true;
- return;
- }
-
- free(param.data);
- // set the operator's parameters
- if(oper->set_param_block(sz, (void*)value)) failed = true;;
-
-
- fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
- _fta.stream_subscribed_cnt = 1;
- _fta.stream_subscribed[0] = f;
-
- _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
- _fta.free_fta = UNOP_HFTA_free_fta;
- _fta.control_fta = UNOP_HFTA_control_fta;
- _fta.accept_packet = UNOP_HFTA_accept_packet;
- _fta.clock_fta = UNOP_HFTA_clock_fta;
- }
-
- ~UNOP_HFTA() {
- delete oper; // free operators memory
-
- }
-
- int flush() {
- list<host_tuple> res;
- if (!oper->flush(res)) {
-
- if (!res.empty()) {
- // go through the list of returned tuples and finalyze them
- list<host_tuple>::iterator iter = res.begin();
- while (iter != res.end()) {
- host_tuple& tup = *iter;
-
- // finalize the tuple
- if (tup.tuple_size)
- finalize_tuple(tup);
- iter++;
- }
-
- // append returned list to output_queue
- output_queue.splice(output_queue.end(), res);
-
-
- // post tuples
- while (!output_queue.empty()) {
- host_tuple& tup = output_queue.front();
- #ifdef DEBUG
- fprintf(stderr, "HFTA::about to post tuple\n");
- #endif
- if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
- tup.free_tuple();
- output_queue.pop_front();
- } else
- break;
- }
- }
- }
-
- return 0;
- }
-
- bool init_failed(){return failed;}
-};
-
-gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
- UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor
- // will be called on program exit
-
- if (recursive)
- // free instance we are subscribed to
- fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
-
- delete ftap;
- return 0;
-}
-
-gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
- UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
-
- if (command == FTA_COMMAND_FLUSH) {
- // ask lfta to do the flush
- fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
- ftap->flush();
-
- } else if (command == FTA_COMMAND_LOAD_PARAMS) {
- // ask lfta to do the flush
- fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
- ftap->flush();
-
- /* extract lfta param block from hfta param block */
- list<param_block> param_list;
- get_lfta_params(sz, value, param_list);
- param_block param = param_list.front();
- // load new parameters into lfta
- fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
- free(param.data);
-
- // notify the operator about the change of parameter
- ftap->oper->set_param_block(sz, (void*)value);
-
- } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
- // we no longer use temp_status commands
- // hearbeat mechanism is used instead
- }
- return 0;
-}
-
-gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
- UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
-#ifdef DEBUG
- fprintf(stderr, "HFTA::accepted packet\n");
-#endif
- if (!length) /* ignore null tuples */
- return 0;
-
- host_tuple temp;
- temp.tuple_size = length;
- temp.data = packet;
- temp.channel = 0;
- temp.heap_resident = false;
-
- // pass the tuple to operator
- list<host_tuple> res;
- int ret;
- fta_stat* tup_trace = NULL;
- gs_uint32_t tup_trace_sz = 0;
- gs_uint64_t trace_id = 0;
- bool temp_tuple_received = false;
-
-
- // if the tuple is temporal we need to extract the hearbeat payload
- if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
- temp_tuple_received = true;
- if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
- fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
- }
-
- if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
- /* perform a flush */
- ftap->flush();
-
- /* post eof_tuple to a parent */
- host_tuple eof_tuple;
- ftap->oper->get_temp_status(eof_tuple);
-
- /* last byte of the tuple specifies the tuple type
- * set it to EOF_TUPLE
- */
- *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
- hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
-
- return 0;
- }
-
- ret = ftap->oper->accept_tuple(temp, res);
-
- // go through the list of returned tuples and finalyze them
- list<host_tuple>::iterator iter = res.begin();
- while (iter != res.end()) {
- host_tuple& tup = *iter;
-
- // finalize the tuple
- if (tup.tuple_size)
- finalize_tuple(tup);
- iter++;
- }
-
- // if we received temporal tuple, last tuple of the result must be temporal too
- // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
- if (temp_tuple_received) {
- fta_stat stats;
- host_tuple& temp_tup = res.back();
-
-
- int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
- char* new_data = (char*)malloc(new_tuple_size);
- memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
- memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
-
- memset((char*)&stats, 0, sizeof(fta_stat));
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
-
- // Send a hearbeat message to clearinghouse.
- fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
-
- temp_tup.free_tuple();
- temp_tup.data = new_data;
- temp_tup.tuple_size = new_tuple_size;
- }
-
- // append returned list to output_queue
- ftap->output_queue.splice(ftap->output_queue.end(), res);
-
- // post tuples
- while (!ftap->output_queue.empty()) {
- host_tuple& tup = ftap->output_queue.front();
- #ifdef DEBUG
- fprintf(stderr, "HFTA::about to post tuple\n");
- #endif
- if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
- tup.free_tuple();
- ftap->output_queue.pop_front();
- } else
- break;
- }
-
- return 1;
-}
-
-gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
-
- // Send a hearbeat message to clearinghouse.to indicate we are alive
- fta_heartbeat(ftap->ftaid, 0, 0, 0);
-
- return 0;
-}
-
-
-struct MULTOP_HFTA {
- struct FTA _fta;
- gs_csp_t fta_name;
- gs_schemahandle_t schema_handle;
- operator_node* root;
- vector<operator_node*> sorted_nodes;
- int num_operators;
- list<lfta_info*> *lfta_list;
- /* number of eof tuples we received so far
- * receiving eof tuples from every source fta will cause a flush
- */
- int num_eof_tuples;
-
- bool failed;
- bool reusable;
-
- list<host_tuple> output_queue;
-
- // Runtime stats
- gs_uint32_t in_tuple_cnt;
- gs_uint32_t out_tuple_cnt;
- gs_uint32_t out_tuple_sz;
- gs_uint64_t cycle_cnt;
-
- gs_uint64_t trace_id;
-
- // memory occupied by output queue
- gs_uint32_t output_queue_mem;
-
-
- // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
- // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
- // as the schema handle is already passed during operator creation time.
- // We also don't need to know the output schema as this information is already embeded
- // in create_output_tuple method of operators' functor.
-
-
-
- MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
- list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
-
- fta_name = name;
- failed = false;
-
- root = node;
- lfta_list = lftas;
-
- // assign streamid
- _fta.ftaid = ftaid;
- _fta.ftaid.streamid = (gs_p_t)this;
-
- schema_handle = sch_handle;
-
- output_queue_mem = 0;
-
- // topologically sort the operators in the tree (or DAG)
- // for DAG we make sure we add the node to the sorted list only once
- operator_node* current_node;
- map<operator_node*, int> node_map;
- vector<operator_node*> node_list;
-
- int i = 0;
- node_list.push_back(root);
- node_map[root] = 0;
-
- num_operators = 1;
-
- while (i < node_list.size()) {
- current_node = node_list[i];
- if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
- node_map[current_node->left_child] = num_operators++;
- node_list.push_back(current_node->left_child);
- }
- if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
- node_map[current_node->right_child] = num_operators++;
- node_list.push_back(current_node->right_child);
- }
- i++;
- }
- num_operators = i;
-
- // build adjacency lists for query DAG
- list<int>* adj_lists = new list<int>[num_operators];
- bool* leaf_flags = new bool[num_operators];
- memset(leaf_flags, 0, num_operators * sizeof(bool));
- for (i = 0; i < num_operators; ++i) {
- current_node = node_list[i];
- if (current_node->left_child) {
- adj_lists[i].push_back(node_map[current_node->left_child]);
- }
- if (current_node->right_child && current_node->left_child != current_node->right_child) {
- adj_lists[i].push_back(node_map[current_node->right_child]);
- }
- }
-
- // run topolofical sort
- bool leaf_found = true;
- while (leaf_found) {
- leaf_found = false;
- // add all leafs to sorted_nodes
- for (i = 0; i < num_operators; ++i) {
- if (!leaf_flags[i] && adj_lists[i].empty()) {
- leaf_flags[i] = true;
- sorted_nodes.push_back(node_list[i]);
- leaf_found = true;
-
- // remove the node from its parents adjecency lists
- for (int j = 0; j < num_operators; ++j) {
- list<int>::iterator iter;
- for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
- if (*iter == i) {
- adj_lists[j].erase(iter);
- break;
- }
- }
- }
- }
- }
- }
-
- delete[] adj_lists;
- delete[] leaf_flags;
-
- // set the parameter block for every operator in tree
- for (i = 0; i < num_operators; ++i)
- if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
-
-#ifdef DEBUG
- fprintf(stderr,"Instantiate FTAs\n");
-#endif
- /* extract lfta param block from hfta param block */
-// NOTE: param_list must line up with lfta_list
- list<param_block> param_list;
- get_lfta_params(sz, value, param_list);
- list<param_block>::iterator iter1;
- list<lfta_info*>::iterator iter2 = lfta_list->begin();
-
- for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
- lfta_info* inf = *iter2;
-
- #ifdef DEBUG
- fprintf(stderr,"Instantiate a FTA\n");
- #endif
-
- gs_uint32_t reuse_flag = 2;
-
- // we will try to create a new instance of child FTA only if it is parameterized
- if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
- (*iter2)->f.streamid = 0; // not interested in existing instances
- reuse_flag = 0;
- }
- if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
- fprintf(stderr,"HFTA::error:could instantiate a FTA");
- failed = true;
- return;
- }
-
- free((*iter1).data);
-
- //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
-
- _fta.stream_subscribed[i]=(*iter2)->f;
- }
- _fta.stream_subscribed_cnt = i;
-
- num_eof_tuples = 0;
-
- _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
- _fta.free_fta = MULTOP_HFTA_free_fta;
- _fta.control_fta = MULTOP_HFTA_control_fta;
- _fta.accept_packet = MULTOP_HFTA_accept_packet;
- _fta.clock_fta = MULTOP_HFTA_clock_fta;
-
- // init runtime stats
- in_tuple_cnt = 0;
- out_tuple_cnt = 0;
- out_tuple_sz = 0;
- cycle_cnt = 0;
-
- }
-
- ~MULTOP_HFTA() {
-
- list<lfta_info*>::iterator iter;
- int i = 0;
-
- for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
- delete *iter;
- }
-
- delete root; // free operators memory
- delete lfta_list;
-
-
- }
-
- int flush() {
-
- list<host_tuple> res;
-
- // go through the list of operators in topological order
- // and flush them
- list<host_tuple>::iterator iter;
- list<host_tuple> temp_output_queue;
-
- for (int i = 0; i < num_operators; ++i) {
- operator_node* node = sorted_nodes[i];
-
-#ifdef PLAN_DAG
- list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
-#else
- // for trees we can put output tuples directly into parent's input buffer
- list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
-#endif
- // consume tuples waiting in your queue
- for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
- node->op->accept_tuple(*(iter), current_output_queue);
- }
- node->op->flush(current_output_queue);
- node->input_queue.clear();
-
-#ifdef PLAN_DAG
- // copy the tuples from output queue into input queues of all parents
- list<operator_node*>::iterator node_iter;
-
- if (!node->parent_list.empty()) {
- // append the content of the output queue to parent input queue
-
- for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
- int* ref_cnt = 0;
- if (node->parent_list.size() > 1) {
- ref_cnt = (int*)malloc(sizeof(int));
- *ref_cnt = node->parent_list.size() - 1;
- }
-
- for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
- (*iter).ref_cnt = ref_cnt;
- (*node_iter)->input_queue.push_back(*iter);
- }
- }
- }
-#endif
- }
-
- if (!res.empty()) {
- // go through the list of returned tuples and finalyze them
- list<host_tuple>::iterator iter = res.begin();
- while (iter != res.end()) {
- host_tuple& tup = *iter;
-
- // finalize the tuple
- if (tup.tuple_size)
- finalize_tuple(tup);
-
- output_queue_mem += tup.tuple_size;
- iter++;
- }
-
- // append returned list to output_queue
- output_queue.splice(output_queue.end(), res);
-
-
- // post tuples
- while (!output_queue.empty()) {
- host_tuple& tup = output_queue.front();
- #ifdef DEBUG
- fprintf(stderr, "HFTA::about to post tuple\n");
- #endif
- if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
- output_queue_mem -= tup.tuple_size;
- tup.free_tuple();
- output_queue.pop_front();
- } else
- break;
- }
- }
-
- return 0;
- }
-
- bool init_failed(){return failed;}
-};
-
-
-gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor
- // will be called on program exit
-
- if (recursive) {
- // free instance we are subscribed to
- list<lfta_info*>::iterator iter;
- int i = 0;
-
- for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
- fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
- }
- }
-
- delete ftap;
- return 0;
-}
-
-gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
-
- if (command == FTA_COMMAND_FLUSH) {
-
- // ask lftas to do the flush
- list<lfta_info*>::iterator iter;
- for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
- fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
- }
- // flush hfta operators
- ftap->flush();
-
- } else if (command == FTA_COMMAND_LOAD_PARAMS) {
-
- list<param_block> param_list;
- get_lfta_params(sz, value, param_list);
-
- // ask lftas to do the flush and set new parameters
- list<lfta_info*>::iterator iter;
- list<param_block>::iterator iter2;
- for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
- fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
- fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
- free((*iter2).data);
- }
- // flush hfta operators
- ftap->flush();
-
- // set the new parameter block for every operator in tree
- for (int i = 0; i < ftap->num_operators; ++i)
- ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
-
- } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
- // we no longer use temp_status commands
- // hearbeat mechanism is used instead
- }
- return 0;
-}
-
-gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
-
- gs_uint64_t start_cycle = rdtsc();
-#ifdef DEBUG
- fprintf(stderr, "HFTA::accepted packet\n");
-#endif
- if (!length) /* ignore null tuples */
- return 0;
-
- ftap->in_tuple_cnt++;
-
- host_tuple temp;
- temp.tuple_size = length;
- temp.data = packet;
- temp.channel = 0;
- temp.heap_resident = false;
-
-// fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
-
- // find from which lfta the tuple came
- list<lfta_info*>::iterator iter;
- lfta_info* inf = NULL;
- int i;
-
- for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
- if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&
- ftap->_fta.stream_subscribed[i].port == ftaid->port &&
- ftap->_fta.stream_subscribed[i].index == ftaid->index &&
- ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
- inf = *iter;
- break;
- }
- }
-
- if (!inf) {
- fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
- exit(1);
- }
-
- // route tuple through operator chain
- list<host_tuple> result;
- host_tuple tup;
- int ret;
- #ifndef PLAN_DAG
- temp.channel = inf->output_channel;
- #endif
- operator_node* current_node = NULL, *child = NULL;
- list<host_tuple> temp_output_queue;
-
-
- fta_stat* tup_trace = NULL;
- gs_uint32_t tup_trace_sz = 0;
- gs_uint64_t trace_id = 0;
- bool temp_tuple_received = false;
-
- // if the tuple is temporal we need to extract the heartbeat payload
- if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
- temp_tuple_received = true;
- if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
- fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
- }
-
- if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
-
- if (++ftap->num_eof_tuples < ftap->lfta_list->size())
- return 0;
-
- ftap->num_eof_tuples = 0;
-
- /* perform a flush */
- ftap->flush();
-
- /* post eof_tuple to a parent */
- host_tuple eof_tuple;
- ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
-
- /* last byte of the tuple specify the tuple type
- * set it to EOF_TUPLE
- */
- *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
- hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
- ftap->out_tuple_cnt++;
- ftap->out_tuple_sz+=eof_tuple.tuple_size;
-
- return 0;
- }
-
- list<host_tuple>::iterator iter2;
-
-#ifdef PLAN_DAG
-
- // push tuple to all parent operators of the lfta
- list<operator_node*>::iterator node_iter;
- list<unsigned>::iterator chan_iter;
- for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
- temp.channel = *chan_iter;
- (*node_iter)->input_queue.push_back(temp);
- }
-
- for (i = 0; i < ftap->num_operators; ++i) {
-
- operator_node* node = ftap->sorted_nodes[i];
- list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
-
- // consume tuples waiting in your queue
- for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
- node->op->accept_tuple(*(iter2), current_output_queue);
- }
- node->input_queue.clear();
-
- // copy the tuples from output queue into input queues of all parents
-
- if (!node->parent_list.empty()) {
-
- // append the content of the output queue to parent input queue
- for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
-
- int* ref_cnt = 0;
- if (node->parent_list.size() > 1) {
- ref_cnt = (int*)malloc(sizeof(int));
- *ref_cnt = node->parent_list.size() - 1;
- }
-
- for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
- (*iter2).ref_cnt = ref_cnt;
- (*iter2).channel = *chan_iter;
- (*node_iter)->input_queue.push_back(*iter2);
- }
- }
- }
- temp_output_queue.clear();
- }
-#else
- current_node = inf->parent;
-
-// fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
- current_node->input_queue.push_back(temp);
-
- do {
-//fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
- list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
-
- // consume tuples waiting in your queue
- for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
- current_node->op->accept_tuple((*iter2),current_output_queue);
- }
-// All consumed, delete them
- current_node->input_queue.clear();
- current_node = current_node->parent;
-
- } while (current_node);
-#endif
-
-
- host_tuple temp_tup;
-
- bool no_temp_tuple = false;
-
- // if we received temporal tuple, last tuple of the result must be temporal too
- // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
- if (temp_tuple_received) {
-
- if (result.empty()) {
- no_temp_tuple = true;
-
- } else {
- fta_stat stats;
- temp_tup = result.back();
- finalize_tuple(temp_tup);
-
- int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
- char* new_data = (char*)malloc(new_tuple_size);
- memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
- memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
-
-
- memset((char*)&stats, 0, sizeof(fta_stat));
- stats.ftaid = fta->ftaid;
- stats.in_tuple_cnt = ftap->in_tuple_cnt;
- stats.out_tuple_cnt = ftap->out_tuple_cnt;
- stats.out_tuple_sz = ftap->out_tuple_sz;
- stats.cycle_cnt = ftap->cycle_cnt;
- memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
-
- // Send a hearbeat message to clearinghouse.
- fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
-
- // reset the stats
- ftap->in_tuple_cnt = 0;
- ftap->out_tuple_cnt = 0;
- ftap->out_tuple_sz = 0;
- ftap->cycle_cnt = 0;
-
- free(temp_tup.data);
- temp_tup.data = new_data;
- temp_tup.tuple_size = new_tuple_size;
- result.pop_back();
- }
- }
-
- // go through the list of returned tuples and finalyze them
- // since we can produce multiple temporal tuples in DAG plans
- // we can drop all of them except the last one
- iter2 = result.begin();
- while(iter2 != result.end()) {
- host_tuple tup = *iter2;
-
- // finalize the tuple
- if (tup.tuple_size) {
- finalize_tuple(tup);
-
- #ifdef PLAN_DAG
- if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
- tup.free_tuple();
- else
- #endif
- {
- ftap->output_queue.push_back(tup);
- ftap->output_queue_mem += tup.tuple_size;
- }
-
- }
- iter2++;
- }
-
- // append returned list to output_queue
- // ftap->output_queue.splice(ftap->output_queue.end(), result);
-
- if (temp_tuple_received && !no_temp_tuple) {
- ftap->output_queue.push_back(temp_tup);
- ftap->output_queue_mem += temp_tup.tuple_size;
- }
-
- // post tuples
- while (!ftap->output_queue.empty()) {
- host_tuple& tup = ftap->output_queue.front();
- #ifdef DEBUG
- fprintf(stderr, "HFTA::about to post tuple\n");
- #endif
- if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
- ftap->out_tuple_cnt++;
- ftap->out_tuple_sz+=tup.tuple_size;
- ftap->output_queue_mem -= tup.tuple_size;
- tup.free_tuple();
- ftap->output_queue.pop_front();
- } else
- break;
- }
-
- ftap->cycle_cnt += rdtsc() - start_cycle;
-
- return 1;
-}
-
-gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
- MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
-
-#ifdef HFTA_PROFILE
- /* Print stats */
- fprintf(stderr, "FTA = %s|", ftap->fta_name);
- fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
- fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
- fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
- fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
-
-
- fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
- unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
-
- for (int i = 1; i < ftap->num_operators; ++i) {
- operator_node* node = ftap->sorted_nodes[i];
- fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
- total_mem += node->op->get_mem_footprint();
- }
- fprintf(stderr, ", total = %d|", total_mem );
- fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
-#endif
-
- fta_stat stats;
- memset((char*)&stats, 0, sizeof(fta_stat));
- stats.ftaid = fta->ftaid;
- stats.in_tuple_cnt = ftap->in_tuple_cnt;
- stats.out_tuple_cnt = ftap->out_tuple_cnt;
- stats.out_tuple_sz = ftap->out_tuple_sz;
- stats.cycle_cnt = ftap->cycle_cnt;
-
- // Send a hearbeat message to clearinghouse.
- fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
-
- // resets runtime stats
- ftap->in_tuple_cnt = 0;
- ftap->out_tuple_cnt = 0;
- ftap->out_tuple_sz = 0;
- ftap->cycle_cnt = 0;
-
- return 0;
-}
-
-
-#endif // __HFTA_H
-
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#ifndef __HFTA_H\r
+#define __HFTA_H\r
+\r
+#include "gstypes.h"\r
+#include "host_tuple.h"\r
+#include "base_operator.h"\r
+#include <vector>\r
+#include <map>\r
+#include<math.h>\r
+#include "rdtsc.h"\r
+using namespace std;\r
+\r
+#define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))\r
+#define LLMIN(x,y) ((x)<(y)?(x):(y))\r
+#define LLMAX(x,y) ((x)<(y)?(y):(x))\r
+#define UMIN(x,y) ((x)<(y)?(x):(y))\r
+#define UMAX(x,y) ((x)<(y)?(y):(x))\r
+#define EQ(x,y) ((x)==(y))\r
+#define GEQ(x,y) ((x)>=(y))\r
+#define LEQ(x,y) ((x)<=(y))\r
+// Cast away temporality\r
+#define non_temporal(x)(x)\r
+// Access math libraries\r
+#define sqrt(x) sqrt(x)\r
+\r
+\r
+extern "C" {\r
+#include <lapp.h>\r
+#include <fta.h>\r
+#include <stdlib.h>\r
+#include <stdio.h>\r
+#include <schemaparser.h>\r
+}\r
+\r
+struct param_block {\r
+ gs_int32_t block_length;\r
+ void* data;\r
+};\r
+\r
+// forward declaration of operator_node\r
+struct operator_node;\r
+\r
+struct lfta_info {\r
+ gs_schemahandle_t schema_handle;\r
+ gs_sp_t schema;\r
+ gs_sp_t fta_name;\r
+#ifdef PLAN_DAG\r
+ list<operator_node*> parent_list;\r
+ list<unsigned> out_channel_list;\r
+#else\r
+ operator_node* parent;\r
+ unsigned output_channel;\r
+#endif\r
+ FTAID f;\r
+\r
+ lfta_info() {\r
+ schema_handle = -1;\r
+ schema = NULL;\r
+ #ifndef PLAN_DAG\r
+ parent = NULL;\r
+ output_channel = 0;\r
+ #endif\r
+ }\r
+\r
+ ~lfta_info() {\r
+ if (fta_name)\r
+ free (fta_name);\r
+ if (schema)\r
+ free (schema);\r
+ if (schema_handle >= 0)\r
+ ftaschema_free(schema_handle);\r
+ }\r
+};\r
+\r
+\r
+struct operator_node {\r
+ base_operator* op;\r
+\r
+#ifdef PLAN_DAG\r
+ list<operator_node*> parent_list;\r
+ list<unsigned> out_channel_list;\r
+#else\r
+ operator_node* parent;\r
+#endif\r
+\r
+ operator_node* left_child;\r
+ operator_node* right_child;\r
+ lfta_info* left_lfta;\r
+ lfta_info* right_lfta;\r
+\r
+ list<host_tuple> input_queue;\r
+\r
+ operator_node(base_operator* o) {\r
+ op = o;\r
+ #ifndef PLAN_DAG\r
+ parent = NULL;\r
+ #endif\r
+ left_child = right_child = NULL;\r
+ left_lfta = right_lfta = NULL;\r
+ }\r
+\r
+ ~operator_node() {\r
+ delete op;\r
+ }\r
+\r
+ void set_left_child_node(operator_node* child) {\r
+ left_child = child;\r
+ if (child) {\r
+ #ifdef PLAN_DAG\r
+ child->parent_list.push_back(this);\r
+ child->out_channel_list.push_back(0);\r
+ #else\r
+ child->parent = this;\r
+ child->op->set_output_channel(0);\r
+ #endif\r
+ }\r
+ }\r
+\r
+ void set_right_child_node(operator_node* child) {\r
+ right_child = child;\r
+ if (child) {\r
+ #ifdef PLAN_DAG\r
+ child->parent_list.push_back(this);\r
+ child->out_channel_list.push_back(1);\r
+ #else\r
+ child->parent = this;\r
+ child->op->set_output_channel(1);\r
+ #endif\r
+ }\r
+ }\r
+\r
+ void set_left_lfta(lfta_info* l_lfta) {\r
+ left_lfta = l_lfta;\r
+ if (left_lfta) {\r
+ #ifdef PLAN_DAG\r
+ left_lfta->parent_list.push_back(this);\r
+ left_lfta->out_channel_list.push_back(0);\r
+ #else\r
+ left_lfta->parent = this;\r
+ left_lfta->output_channel = 0;\r
+ #endif\r
+ }\r
+ }\r
+\r
+ void set_right_lfta(lfta_info* r_lfta) {\r
+ right_lfta = r_lfta;\r
+ if (right_lfta) {\r
+ #ifdef PLAN_DAG\r
+ right_lfta->parent_list.push_back(this);\r
+ right_lfta->out_channel_list.push_back(1);\r
+ #else\r
+ right_lfta->parent = this;\r
+ right_lfta->output_channel = 1;\r
+ #endif\r
+ }\r
+ }\r
+\r
+};\r
+\r
+\r
+\r
+int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);\r
+void finalize_tuple(host_tuple &tup);\r
+void finalize_tuple(host_tuple &tup);\r
+gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
+gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
+gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
+gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);\r
+\r
+gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);\r
+gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);\r
+gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);\r
+gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);\r
+\r
+struct UNOP_HFTA {\r
+ struct FTA _fta;\r
+ base_operator* oper;\r
+ FTAID f;\r
+ bool failed;\r
+ gs_schemahandle_t schema_handle;\r
+\r
+ list<host_tuple> output_queue;\r
+\r
+ // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
+ // lfta_name and an instance of the base_operator\r
+ // We don't need to know the output schema as this information is already embeded\r
+ // in create_output_tuple method of operators' functor.\r
+ UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,\r
+ gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {\r
+\r
+ failed = false;\r
+ oper = op;\r
+ f = child_ftaid;\r
+ schema_handle = sch_handle;\r
+\r
+ // assign streamid\r
+ _fta.ftaid = ftaid;\r
+ _fta.ftaid.streamid = (gs_p_t)this;\r
+\r
+#ifdef DEBUG\r
+ fprintf(stderr,"Instantiate a FTA\n");\r
+#endif\r
+ /* extract lfta param block from hfta param block */\r
+ list<param_block> param_list;\r
+ get_lfta_params(sz, value, param_list);\r
+ param_block param = param_list.front();\r
+\r
+\r
+ gs_uint32_t reuse_flag = 2;\r
+ // we will try to create a new instance of child FTA only if it is parameterized\r
+ if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
+ f.streamid = 0; // not interested in existing instances\r
+ reuse_flag = 0;\r
+ }\r
+\r
+ if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {\r
+ fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
+ failed = true;\r
+ return;\r
+ }\r
+\r
+ free(param.data);\r
+ // set the operator's parameters\r
+ if(oper->set_param_block(sz, (void*)value)) failed = true;;\r
+\r
+\r
+ fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);\r
+ _fta.stream_subscribed_cnt = 1;\r
+ _fta.stream_subscribed[0] = f;\r
+\r
+ _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)\r
+ _fta.free_fta = UNOP_HFTA_free_fta;\r
+ _fta.control_fta = UNOP_HFTA_control_fta;\r
+ _fta.accept_packet = UNOP_HFTA_accept_packet;\r
+ _fta.clock_fta = UNOP_HFTA_clock_fta;\r
+ }\r
+\r
+ ~UNOP_HFTA() {\r
+ delete oper; // free operators memory\r
+\r
+ }\r
+\r
+ int flush() {\r
+ list<host_tuple> res;\r
+ if (!oper->flush(res)) {\r
+\r
+ if (!res.empty()) {\r
+ // go through the list of returned tuples and finalyze them\r
+ list<host_tuple>::iterator iter = res.begin();\r
+ while (iter != res.end()) {\r
+ host_tuple& tup = *iter;\r
+\r
+ // finalize the tuple\r
+ if (tup.tuple_size)\r
+ finalize_tuple(tup);\r
+ iter++;\r
+ }\r
+\r
+ // append returned list to output_queue\r
+ output_queue.splice(output_queue.end(), res);\r
+\r
+\r
+ // post tuples\r
+ while (!output_queue.empty()) {\r
+ host_tuple& tup = output_queue.front();\r
+ #ifdef DEBUG\r
+ fprintf(stderr, "HFTA::about to post tuple\n");\r
+ #endif\r
+ if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
+ tup.free_tuple();\r
+ output_queue.pop_front();\r
+ } else\r
+ break;\r
+ }\r
+ }\r
+ }\r
+\r
+ return 0;\r
+ }\r
+\r
+ bool init_failed(){return failed;}\r
+};\r
+\r
+gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
+ UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor\r
+ // will be called on program exit\r
+\r
+ if (recursive)\r
+ // free instance we are subscribed to\r
+ fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);\r
+\r
+ delete ftap;\r
+ return 0;\r
+}\r
+\r
+gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
+ UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
+\r
+ if (command == FTA_COMMAND_FLUSH) {\r
+ // ask lfta to do the flush\r
+ fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
+ ftap->flush();\r
+\r
+ } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
+ // ask lfta to do the flush\r
+ fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);\r
+ ftap->flush();\r
+\r
+ /* extract lfta param block from hfta param block */\r
+ list<param_block> param_list;\r
+ get_lfta_params(sz, value, param_list);\r
+ param_block param = param_list.front();\r
+ // load new parameters into lfta\r
+ fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);\r
+ free(param.data);\r
+\r
+ // notify the operator about the change of parameter\r
+ ftap->oper->set_param_block(sz, (void*)value);\r
+\r
+ } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
+ // we no longer use temp_status commands\r
+ // hearbeat mechanism is used instead\r
+ }\r
+ return 0;\r
+}\r
+\r
+gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
+ UNOP_HFTA* ftap = (UNOP_HFTA*)fta;\r
+#ifdef DEBUG\r
+ fprintf(stderr, "HFTA::accepted packet\n");\r
+#endif\r
+ if (!length) /* ignore null tuples */\r
+ return 0;\r
+\r
+ host_tuple temp;\r
+ temp.tuple_size = length;\r
+ temp.data = packet;\r
+ temp.channel = 0;\r
+ temp.heap_resident = false;\r
+\r
+ // pass the tuple to operator\r
+ list<host_tuple> res;\r
+ int ret;\r
+ fta_stat* tup_trace = NULL;\r
+ gs_uint32_t tup_trace_sz = 0;\r
+ gs_uint64_t trace_id = 0;\r
+ bool temp_tuple_received = false;\r
+\r
+\r
+ // if the tuple is temporal we need to extract the hearbeat payload\r
+ if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {\r
+ temp_tuple_received = true;\r
+ if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
+ fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
+ }\r
+\r
+ if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {\r
+ /* perform a flush */\r
+ ftap->flush();\r
+\r
+ /* post eof_tuple to a parent */\r
+ host_tuple eof_tuple;\r
+ ftap->oper->get_temp_status(eof_tuple);\r
+\r
+ /* last byte of the tuple specifies the tuple type\r
+ * set it to EOF_TUPLE\r
+ */\r
+ *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
+ hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
+\r
+ return 0;\r
+ }\r
+\r
+ ret = ftap->oper->accept_tuple(temp, res);\r
+\r
+ // go through the list of returned tuples and finalyze them\r
+ list<host_tuple>::iterator iter = res.begin();\r
+ while (iter != res.end()) {\r
+ host_tuple& tup = *iter;\r
+\r
+ // finalize the tuple\r
+ if (tup.tuple_size)\r
+ finalize_tuple(tup);\r
+ iter++;\r
+ }\r
+\r
+ // if we received temporal tuple, last tuple of the result must be temporal too\r
+ // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
+ if (temp_tuple_received) {\r
+ fta_stat stats;\r
+ host_tuple& temp_tup = res.back();\r
+\r
+\r
+ int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
+ char* new_data = (char*)malloc(new_tuple_size);\r
+ memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
+ memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));\r
+\r
+ memset((char*)&stats, 0, sizeof(fta_stat));\r
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
+\r
+ // Send a hearbeat message to clearinghouse.\r
+ fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
+\r
+ temp_tup.free_tuple();\r
+ temp_tup.data = new_data;\r
+ temp_tup.tuple_size = new_tuple_size;\r
+ }\r
+\r
+ // append returned list to output_queue\r
+ ftap->output_queue.splice(ftap->output_queue.end(), res);\r
+\r
+ // post tuples\r
+ while (!ftap->output_queue.empty()) {\r
+ host_tuple& tup = ftap->output_queue.front();\r
+ #ifdef DEBUG\r
+ fprintf(stderr, "HFTA::about to post tuple\n");\r
+ #endif\r
+ if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
+ tup.free_tuple();\r
+ ftap->output_queue.pop_front();\r
+ } else\r
+ break;\r
+ }\r
+\r
+ return 1;\r
+}\r
+\r
+gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {\r
+\r
+ // Send a hearbeat message to clearinghouse.to indicate we are alive\r
+ fta_heartbeat(ftap->ftaid, 0, 0, 0);\r
+\r
+ return 0;\r
+}\r
+\r
+\r
+struct MULTOP_HFTA {\r
+ struct FTA _fta;\r
+ gs_csp_t fta_name;\r
+ gs_schemahandle_t schema_handle;\r
+ operator_node* root;\r
+ vector<operator_node*> sorted_nodes;\r
+ int num_operators;\r
+ list<lfta_info*> *lfta_list;\r
+ /* number of eof tuples we received so far\r
+ * receiving eof tuples from every source fta will cause a flush\r
+ */\r
+ int num_eof_tuples;\r
+\r
+ bool failed;\r
+ bool reusable;\r
+\r
+ list<host_tuple> output_queue;\r
+\r
+ // Runtime stats\r
+ gs_uint32_t in_tuple_cnt;\r
+ gs_uint32_t out_tuple_cnt;\r
+ gs_uint32_t out_tuple_sz;\r
+ gs_uint64_t cycle_cnt;\r
+\r
+ gs_uint64_t trace_id;\r
+\r
+ // memory occupied by output queue\r
+ gs_uint32_t output_queue_mem;\r
+\r
+\r
+ // To create an hfta we will need all the parameters passed to alloc_fta by host library plus\r
+ // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,\r
+ // as the schema handle is already passed during operator creation time.\r
+ // We also don't need to know the output schema as this information is already embeded\r
+ // in create_output_tuple method of operators' functor.\r
+\r
+\r
+\r
+ MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,\r
+ list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {\r
+\r
+ fta_name = name;\r
+ failed = false;\r
+\r
+ root = node;\r
+ lfta_list = lftas;\r
+\r
+ // assign streamid\r
+ _fta.ftaid = ftaid;\r
+ _fta.ftaid.streamid = (gs_p_t)this;\r
+\r
+ schema_handle = sch_handle;\r
+\r
+ output_queue_mem = 0;\r
+\r
+ // topologically sort the operators in the tree (or DAG)\r
+ // for DAG we make sure we add the node to the sorted list only once\r
+ operator_node* current_node;\r
+ map<operator_node*, int> node_map;\r
+ vector<operator_node*> node_list;\r
+\r
+ int i = 0;\r
+ node_list.push_back(root);\r
+ node_map[root] = 0;\r
+\r
+ num_operators = 1;\r
+\r
+ while (i < node_list.size()) {\r
+ current_node = node_list[i];\r
+ if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {\r
+ node_map[current_node->left_child] = num_operators++;\r
+ node_list.push_back(current_node->left_child);\r
+ }\r
+ if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {\r
+ node_map[current_node->right_child] = num_operators++;\r
+ node_list.push_back(current_node->right_child);\r
+ }\r
+ i++;\r
+ }\r
+ num_operators = i;\r
+\r
+ // build adjacency lists for query DAG\r
+ list<int>* adj_lists = new list<int>[num_operators];\r
+ bool* leaf_flags = new bool[num_operators];\r
+ memset(leaf_flags, 0, num_operators * sizeof(bool));\r
+ for (i = 0; i < num_operators; ++i) {\r
+ current_node = node_list[i];\r
+ if (current_node->left_child) {\r
+ adj_lists[i].push_back(node_map[current_node->left_child]);\r
+ }\r
+ if (current_node->right_child && current_node->left_child != current_node->right_child) {\r
+ adj_lists[i].push_back(node_map[current_node->right_child]);\r
+ }\r
+ }\r
+\r
+ // run topolofical sort\r
+ bool leaf_found = true;\r
+ while (leaf_found) {\r
+ leaf_found = false;\r
+ // add all leafs to sorted_nodes\r
+ for (i = 0; i < num_operators; ++i) {\r
+ if (!leaf_flags[i] && adj_lists[i].empty()) {\r
+ leaf_flags[i] = true;\r
+ sorted_nodes.push_back(node_list[i]);\r
+ leaf_found = true;\r
+\r
+ // remove the node from its parents adjecency lists\r
+ for (int j = 0; j < num_operators; ++j) {\r
+ list<int>::iterator iter;\r
+ for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {\r
+ if (*iter == i) {\r
+ adj_lists[j].erase(iter);\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ delete[] adj_lists;\r
+ delete[] leaf_flags;\r
+\r
+ // set the parameter block for every operator in tree\r
+ for (i = 0; i < num_operators; ++i)\r
+ if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;\r
+\r
+#ifdef DEBUG\r
+ fprintf(stderr,"Instantiate FTAs\n");\r
+#endif\r
+ /* extract lfta param block from hfta param block */\r
+// NOTE: param_list must line up with lfta_list\r
+ list<param_block> param_list;\r
+ get_lfta_params(sz, value, param_list);\r
+ list<param_block>::iterator iter1;\r
+ list<lfta_info*>::iterator iter2 = lfta_list->begin();\r
+\r
+ for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {\r
+ lfta_info* inf = *iter2;\r
+\r
+ #ifdef DEBUG\r
+ fprintf(stderr,"Instantiate a FTA\n");\r
+ #endif\r
+\r
+ gs_uint32_t reuse_flag = 2;\r
+\r
+ // we will try to create a new instance of child FTA only if it is parameterized\r
+ if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {\r
+ (*iter2)->f.streamid = 0; // not interested in existing instances\r
+ reuse_flag = 0;\r
+ }\r
+ if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {\r
+ fprintf(stderr,"HFTA::error:could instantiate a FTA");\r
+ failed = true;\r
+ return;\r
+ }\r
+\r
+ free((*iter1).data);\r
+\r
+ //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");\r
+\r
+ _fta.stream_subscribed[i]=(*iter2)->f;\r
+ }\r
+ _fta.stream_subscribed_cnt = i;\r
+\r
+ num_eof_tuples = 0;\r
+\r
+ _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)\r
+ _fta.free_fta = MULTOP_HFTA_free_fta;\r
+ _fta.control_fta = MULTOP_HFTA_control_fta;\r
+ _fta.accept_packet = MULTOP_HFTA_accept_packet;\r
+ _fta.clock_fta = MULTOP_HFTA_clock_fta;\r
+\r
+ // init runtime stats\r
+ in_tuple_cnt = 0;\r
+ out_tuple_cnt = 0;\r
+ out_tuple_sz = 0;\r
+ cycle_cnt = 0;\r
+\r
+ }\r
+\r
+ ~MULTOP_HFTA() {\r
+\r
+ list<lfta_info*>::iterator iter;\r
+ int i = 0;\r
+\r
+ for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {\r
+ delete *iter;\r
+ }\r
+\r
+ delete root; // free operators memory\r
+ delete lfta_list;\r
+\r
+\r
+ }\r
+\r
+ int flush() {\r
+\r
+ list<host_tuple> res;\r
+\r
+ // go through the list of operators in topological order\r
+ // and flush them\r
+ list<host_tuple>::iterator iter;\r
+ list<host_tuple> temp_output_queue;\r
+\r
+ for (int i = 0; i < num_operators; ++i) {\r
+ operator_node* node = sorted_nodes[i];\r
+\r
+#ifdef PLAN_DAG\r
+ list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;\r
+#else\r
+ // for trees we can put output tuples directly into parent's input buffer\r
+ list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;\r
+#endif\r
+ // consume tuples waiting in your queue\r
+ for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {\r
+ node->op->accept_tuple(*(iter), current_output_queue);\r
+ }\r
+ node->op->flush(current_output_queue);\r
+ node->input_queue.clear();\r
+\r
+#ifdef PLAN_DAG\r
+ // copy the tuples from output queue into input queues of all parents\r
+ list<operator_node*>::iterator node_iter;\r
+\r
+ if (!node->parent_list.empty()) {\r
+ // append the content of the output queue to parent input queue\r
+\r
+ for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {\r
+ int* ref_cnt = 0;\r
+ if (node->parent_list.size() > 1) {\r
+ ref_cnt = (int*)malloc(sizeof(int));\r
+ *ref_cnt = node->parent_list.size() - 1;\r
+ }\r
+\r
+ for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {\r
+ (*iter).ref_cnt = ref_cnt;\r
+ (*node_iter)->input_queue.push_back(*iter);\r
+ }\r
+ }\r
+ }\r
+#endif\r
+ }\r
+\r
+ if (!res.empty()) {\r
+ // go through the list of returned tuples and finalyze them\r
+ list<host_tuple>::iterator iter = res.begin();\r
+ while (iter != res.end()) {\r
+ host_tuple& tup = *iter;\r
+\r
+ // finalize the tuple\r
+ if (tup.tuple_size)\r
+ finalize_tuple(tup);\r
+\r
+ output_queue_mem += tup.tuple_size;\r
+ iter++;\r
+ }\r
+\r
+ // append returned list to output_queue\r
+ output_queue.splice(output_queue.end(), res);\r
+\r
+\r
+ // post tuples\r
+ while (!output_queue.empty()) {\r
+ host_tuple& tup = output_queue.front();\r
+ #ifdef DEBUG\r
+ fprintf(stderr, "HFTA::about to post tuple\n");\r
+ #endif\r
+ if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {\r
+ output_queue_mem -= tup.tuple_size;\r
+ tup.free_tuple();\r
+ output_queue.pop_front();\r
+ } else\r
+ break;\r
+ }\r
+ }\r
+\r
+ return 0;\r
+ }\r
+\r
+ bool init_failed(){return failed;}\r
+};\r
+\r
+\r
+gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {\r
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor\r
+ // will be called on program exit\r
+\r
+ if (recursive) {\r
+ // free instance we are subscribed to\r
+ list<lfta_info*>::iterator iter;\r
+ int i = 0;\r
+\r
+ for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {\r
+ fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);\r
+ }\r
+ }\r
+\r
+ delete ftap;\r
+ return 0;\r
+}\r
+\r
+gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {\r
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
+\r
+ if (command == FTA_COMMAND_FLUSH) {\r
+\r
+ // ask lftas to do the flush\r
+ list<lfta_info*>::iterator iter;\r
+ for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {\r
+ fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);\r
+ }\r
+ // flush hfta operators\r
+ ftap->flush();\r
+\r
+ } else if (command == FTA_COMMAND_LOAD_PARAMS) {\r
+\r
+ list<param_block> param_list;\r
+ get_lfta_params(sz, value, param_list);\r
+\r
+ // ask lftas to do the flush and set new parameters\r
+ list<lfta_info*>::iterator iter;\r
+ list<param_block>::iterator iter2;\r
+ for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {\r
+ fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);\r
+ fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);\r
+ free((*iter2).data);\r
+ }\r
+ // flush hfta operators\r
+ ftap->flush();\r
+\r
+ // set the new parameter block for every operator in tree\r
+ for (int i = 0; i < ftap->num_operators; ++i)\r
+ ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);\r
+\r
+ } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {\r
+ // we no longer use temp_status commands\r
+ // hearbeat mechanism is used instead\r
+ }\r
+ return 0;\r
+}\r
+\r
+gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {\r
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
+\r
+ gs_uint64_t start_cycle = rdtsc();\r
+#ifdef DEBUG\r
+ fprintf(stderr, "HFTA::accepted packet\n");\r
+#endif\r
+ if (!length) /* ignore null tuples */\r
+ return 0;\r
+\r
+ ftap->in_tuple_cnt++;\r
+\r
+ host_tuple temp;\r
+ temp.tuple_size = length;\r
+ temp.data = packet;\r
+ temp.channel = 0;\r
+ temp.heap_resident = false;\r
+\r
+// fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
+\r
+ // find from which lfta the tuple came\r
+ list<lfta_info*>::iterator iter;\r
+ lfta_info* inf = NULL;\r
+ int i;\r
+\r
+ for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {\r
+ if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&\r
+ ftap->_fta.stream_subscribed[i].port == ftaid->port &&\r
+ ftap->_fta.stream_subscribed[i].index == ftaid->index &&\r
+ ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {\r
+ inf = *iter;\r
+ break;\r
+ }\r
+ }\r
+\r
+ if (!inf) {\r
+ fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");\r
+ exit(1);\r
+ }\r
+\r
+ // route tuple through operator chain\r
+ list<host_tuple> result;\r
+ host_tuple tup;\r
+ int ret;\r
+ #ifndef PLAN_DAG\r
+ temp.channel = inf->output_channel;\r
+ #endif\r
+ operator_node* current_node = NULL, *child = NULL;\r
+ list<host_tuple> temp_output_queue;\r
+\r
+\r
+ fta_stat* tup_trace = NULL;\r
+ gs_uint32_t tup_trace_sz = 0;\r
+ gs_uint64_t trace_id = 0;\r
+ bool temp_tuple_received = false;\r
+\r
+ // if the tuple is temporal we need to extract the heartbeat payload\r
+ if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {\r
+ temp_tuple_received = true;\r
+ if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))\r
+ fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");\r
+ }\r
+\r
+ if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {\r
+\r
+ if (++ftap->num_eof_tuples < ftap->lfta_list->size())\r
+ return 0;\r
+\r
+ ftap->num_eof_tuples = 0;\r
+\r
+ /* perform a flush */\r
+ ftap->flush();\r
+\r
+ /* post eof_tuple to a parent */\r
+ host_tuple eof_tuple;\r
+ ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);\r
+\r
+ /* last byte of the tuple specify the tuple type\r
+ * set it to EOF_TUPLE\r
+ */\r
+ *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;\r
+ hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);\r
+ ftap->out_tuple_cnt++;\r
+ ftap->out_tuple_sz+=eof_tuple.tuple_size;\r
+\r
+ return 0;\r
+ }\r
+\r
+ list<host_tuple>::iterator iter2;\r
+\r
+#ifdef PLAN_DAG\r
+\r
+ // push tuple to all parent operators of the lfta\r
+ list<operator_node*>::iterator node_iter;\r
+ list<unsigned>::iterator chan_iter;\r
+ for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {\r
+ temp.channel = *chan_iter;\r
+ (*node_iter)->input_queue.push_back(temp);\r
+ }\r
+\r
+ for (i = 0; i < ftap->num_operators; ++i) {\r
+\r
+ operator_node* node = ftap->sorted_nodes[i];\r
+ list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;\r
+\r
+ // consume tuples waiting in your queue\r
+ for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {\r
+ node->op->accept_tuple(*(iter2), current_output_queue);\r
+ }\r
+ node->input_queue.clear();\r
+\r
+ // copy the tuples from output queue into input queues of all parents\r
+\r
+ if (!node->parent_list.empty()) {\r
+\r
+ // append the content of the output queue to parent input queue\r
+ for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {\r
+\r
+ int* ref_cnt = 0;\r
+ if (node->parent_list.size() > 1) {\r
+ ref_cnt = (int*)malloc(sizeof(int));\r
+ *ref_cnt = node->parent_list.size() - 1;\r
+ }\r
+\r
+ for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {\r
+ (*iter2).ref_cnt = ref_cnt;\r
+ (*iter2).channel = *chan_iter;\r
+ (*node_iter)->input_queue.push_back(*iter2);\r
+ }\r
+ }\r
+ }\r
+ temp_output_queue.clear();\r
+ }\r
+#else\r
+ current_node = inf->parent;\r
+\r
+// fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);\r
+ current_node->input_queue.push_back(temp);\r
+\r
+ do {\r
+//fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);\r
+ list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;\r
+\r
+ // consume tuples waiting in your queue\r
+ for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {\r
+ current_node->op->accept_tuple((*iter2),current_output_queue);\r
+ }\r
+// All consumed, delete them\r
+ current_node->input_queue.clear();\r
+ current_node = current_node->parent;\r
+\r
+ } while (current_node);\r
+#endif\r
+\r
+\r
+ host_tuple temp_tup;\r
+\r
+ bool no_temp_tuple = false;\r
+\r
+ // if we received temporal tuple, last tuple of the result must be temporal too\r
+ // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse\r
+ if (temp_tuple_received) {\r
+\r
+ if (result.empty()) {\r
+ no_temp_tuple = true;\r
+\r
+ } else {\r
+ fta_stat stats;\r
+ temp_tup = result.back();\r
+ finalize_tuple(temp_tup);\r
+\r
+ int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);\r
+ char* new_data = (char*)malloc(new_tuple_size);\r
+ memcpy(new_data, temp_tup.data, temp_tup.tuple_size);\r
+ memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));\r
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));\r
+\r
+\r
+ memset((char*)&stats, 0, sizeof(fta_stat));\r
+ stats.ftaid = fta->ftaid;\r
+ stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
+ stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
+ stats.out_tuple_sz = ftap->out_tuple_sz;\r
+ stats.cycle_cnt = ftap->cycle_cnt;\r
+ memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));\r
+\r
+ // Send a hearbeat message to clearinghouse.\r
+ fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));\r
+\r
+ // reset the stats\r
+ ftap->in_tuple_cnt = 0;\r
+ ftap->out_tuple_cnt = 0;\r
+ ftap->out_tuple_sz = 0;\r
+ ftap->cycle_cnt = 0;\r
+\r
+ free(temp_tup.data);\r
+ temp_tup.data = new_data;\r
+ temp_tup.tuple_size = new_tuple_size;\r
+ result.pop_back();\r
+ }\r
+ }\r
+\r
+ // go through the list of returned tuples and finalyze them\r
+ // since we can produce multiple temporal tuples in DAG plans\r
+ // we can drop all of them except the last one\r
+ iter2 = result.begin();\r
+ while(iter2 != result.end()) {\r
+ host_tuple tup = *iter2;\r
+\r
+ // finalize the tuple\r
+ if (tup.tuple_size) {\r
+ finalize_tuple(tup);\r
+\r
+ #ifdef PLAN_DAG\r
+ if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))\r
+ tup.free_tuple();\r
+ else\r
+ #endif\r
+ {\r
+ ftap->output_queue.push_back(tup);\r
+ ftap->output_queue_mem += tup.tuple_size;\r
+ }\r
+\r
+ }\r
+ iter2++;\r
+ }\r
+\r
+ // append returned list to output_queue\r
+ // ftap->output_queue.splice(ftap->output_queue.end(), result);\r
+\r
+ if (temp_tuple_received && !no_temp_tuple) {\r
+ ftap->output_queue.push_back(temp_tup);\r
+ ftap->output_queue_mem += temp_tup.tuple_size;\r
+ }\r
+\r
+ // post tuples\r
+ while (!ftap->output_queue.empty()) {\r
+ host_tuple& tup = ftap->output_queue.front();\r
+ #ifdef DEBUG\r
+ fprintf(stderr, "HFTA::about to post tuple\n");\r
+ #endif\r
+ if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {\r
+ ftap->out_tuple_cnt++;\r
+ ftap->out_tuple_sz+=tup.tuple_size;\r
+ ftap->output_queue_mem -= tup.tuple_size;\r
+ tup.free_tuple();\r
+ ftap->output_queue.pop_front();\r
+ } else\r
+ break;\r
+ }\r
+\r
+ ftap->cycle_cnt += rdtsc() - start_cycle;\r
+\r
+ return 1;\r
+}\r
+\r
+gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {\r
+ MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;\r
+\r
+#ifdef HFTA_PROFILE\r
+ /* Print stats */\r
+ fprintf(stderr, "FTA = %s|", ftap->fta_name);\r
+ fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);\r
+ fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);\r
+ fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);\r
+ fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);\r
+\r
+\r
+ fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());\r
+ unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();\r
+\r
+ for (int i = 1; i < ftap->num_operators; ++i) {\r
+ operator_node* node = ftap->sorted_nodes[i];\r
+ fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());\r
+ total_mem += node->op->get_mem_footprint();\r
+ }\r
+ fprintf(stderr, ", total = %d|", total_mem );\r
+ fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );\r
+#endif\r
+\r
+ fta_stat stats;\r
+ memset((char*)&stats, 0, sizeof(fta_stat));\r
+ stats.ftaid = fta->ftaid;\r
+ stats.in_tuple_cnt = ftap->in_tuple_cnt;\r
+ stats.out_tuple_cnt = ftap->out_tuple_cnt;\r
+ stats.out_tuple_sz = ftap->out_tuple_sz;\r
+ stats.cycle_cnt = ftap->cycle_cnt;\r
+\r
+ // Send a hearbeat message to clearinghouse.\r
+ fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);\r
+\r
+ // resets runtime stats\r
+ ftap->in_tuple_cnt = 0;\r
+ ftap->out_tuple_cnt = 0;\r
+ ftap->out_tuple_sz = 0;\r
+ ftap->cycle_cnt = 0;\r
+\r
+ return 0;\r
+}\r
+\r
+\r
+#endif // __HFTA_H\r
+\r