1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
20 #include "host_tuple.h"
21 #include "base_operator.h"
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
29 #define LLMIN(x,y) ((x)<(y)?(x):(y))
30 #define LLMAX(x,y) ((x)<(y)?(y):(x))
31 #define UMIN(x,y) ((x)<(y)?(x):(y))
32 #define UMAX(x,y) ((x)<(y)?(y):(x))
33 #define EQ(x,y) ((x)==(y))
34 #define GEQ(x,y) ((x)>=(y))
35 #define LEQ(x,y) ((x)<=(y))
36 // Cast away temporality
37 #define non_temporal(x)(x)
38 // Access math libraries
39 #define sqrt(x) sqrt(x)
47 #include <schemaparser.h>
51 gs_int32_t block_length;
55 // forward declaration of operator_node
59 gs_schemahandle_t schema_handle;
63 list<operator_node*> parent_list;
64 list<unsigned> out_channel_list;
66 operator_node* parent;
67 unsigned output_channel;
85 if (schema_handle >= 0)
86 ftaschema_free(schema_handle);
91 struct operator_node {
95 list<operator_node*> parent_list;
96 list<unsigned> out_channel_list;
98 operator_node* parent;
101 operator_node* left_child;
102 operator_node* right_child;
103 lfta_info* left_lfta;
104 lfta_info* right_lfta;
106 list<host_tuple> input_queue;
108 operator_node(base_operator* o) {
113 left_child = right_child = NULL;
114 left_lfta = right_lfta = NULL;
121 void set_left_child_node(operator_node* child) {
125 child->parent_list.push_back(this);
126 child->out_channel_list.push_back(0);
128 child->parent = this;
129 child->op->set_output_channel(0);
134 void set_right_child_node(operator_node* child) {
138 child->parent_list.push_back(this);
139 child->out_channel_list.push_back(1);
141 child->parent = this;
142 child->op->set_output_channel(1);
147 void set_left_lfta(lfta_info* l_lfta) {
151 left_lfta->parent_list.push_back(this);
152 left_lfta->out_channel_list.push_back(0);
154 left_lfta->parent = this;
155 left_lfta->output_channel = 0;
160 void set_right_lfta(lfta_info* r_lfta) {
164 right_lfta->parent_list.push_back(this);
165 right_lfta->out_channel_list.push_back(1);
167 right_lfta->parent = this;
168 right_lfta->output_channel = 1;
177 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
178 void finalize_tuple(host_tuple &tup);
179 void finalize_tuple(host_tuple &tup);
180 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
181 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
182 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
183 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
185 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
186 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
187 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
188 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
195 gs_schemahandle_t schema_handle;
197 list<host_tuple> output_queue;
199 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
200 // lfta_name and an instance of the base_operator
201 // We don't need to know the output schema as this information is already embeded
202 // in create_output_tuple method of operators' functor.
203 UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
204 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
209 schema_handle = sch_handle;
213 _fta.ftaid.streamid = (gs_p_t)this;
216 fprintf(stderr,"Instantiate a FTA\n");
218 /* extract lfta param block from hfta param block */
219 list<param_block> param_list;
220 get_lfta_params(sz, value, param_list);
221 param_block param = param_list.front();
224 gs_uint32_t reuse_flag = 2;
225 // we will try to create a new instance of child FTA only if it is parameterized
226 if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
227 f.streamid = 0; // not interested in existing instances
231 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
232 fprintf(stderr,"HFTA::error:could instantiate a FTA");
238 // set the operator's parameters
239 if(oper->set_param_block(sz, (void*)value)) failed = true;;
242 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
243 _fta.stream_subscribed_cnt = 1;
244 _fta.stream_subscribed[0] = f;
246 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
247 _fta.free_fta = UNOP_HFTA_free_fta;
248 _fta.control_fta = UNOP_HFTA_control_fta;
249 _fta.accept_packet = UNOP_HFTA_accept_packet;
250 _fta.clock_fta = UNOP_HFTA_clock_fta;
254 delete oper; // free operators memory
259 list<host_tuple> res;
260 if (!oper->flush(res)) {
263 // go through the list of returned tuples and finalyze them
264 list<host_tuple>::iterator iter = res.begin();
265 while (iter != res.end()) {
266 host_tuple& tup = *iter;
268 // finalize the tuple
274 // append returned list to output_queue
275 output_queue.splice(output_queue.end(), res);
279 while (!output_queue.empty()) {
280 host_tuple& tup = output_queue.front();
282 fprintf(stderr, "HFTA::about to post tuple\n");
284 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
286 output_queue.pop_front();
296 bool init_failed(){return failed;}
299 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
300 UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor
301 // will be called on program exit
304 // free instance we are subscribed to
305 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
311 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
312 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
314 if (command == FTA_COMMAND_FLUSH) {
315 // ask lfta to do the flush
316 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
319 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
320 // ask lfta to do the flush
321 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
324 /* extract lfta param block from hfta param block */
325 list<param_block> param_list;
326 get_lfta_params(sz, value, param_list);
327 param_block param = param_list.front();
328 // load new parameters into lfta
329 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
332 // notify the operator about the change of parameter
333 ftap->oper->set_param_block(sz, (void*)value);
335 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
336 // we no longer use temp_status commands
337 // hearbeat mechanism is used instead
342 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
343 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
345 fprintf(stderr, "HFTA::accepted packet\n");
347 if (!length) /* ignore null tuples */
351 temp.tuple_size = length;
354 temp.heap_resident = false;
356 // pass the tuple to operator
357 list<host_tuple> res;
359 fta_stat* tup_trace = NULL;
360 gs_uint32_t tup_trace_sz = 0;
361 gs_uint64_t trace_id = 0;
362 bool temp_tuple_received = false;
365 // if the tuple is temporal we need to extract the hearbeat payload
366 if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
367 temp_tuple_received = true;
368 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
369 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
372 if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
373 /* perform a flush */
376 /* post eof_tuple to a parent */
377 host_tuple eof_tuple;
378 ftap->oper->get_temp_status(eof_tuple);
380 /* last byte of the tuple specifies the tuple type
381 * set it to EOF_TUPLE
383 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
384 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
389 ret = ftap->oper->accept_tuple(temp, res);
391 // go through the list of returned tuples and finalyze them
392 list<host_tuple>::iterator iter = res.begin();
393 while (iter != res.end()) {
394 host_tuple& tup = *iter;
396 // finalize the tuple
402 // if we received temporal tuple, last tuple of the result must be temporal too
403 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
404 if (temp_tuple_received) {
406 host_tuple& temp_tup = res.back();
409 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
410 char* new_data = (char*)malloc(new_tuple_size);
411 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
412 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
413 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
414 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
416 memset((char*)&stats, 0, sizeof(fta_stat));
417 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
419 // Send a hearbeat message to clearinghouse.
420 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
422 temp_tup.free_tuple();
423 temp_tup.data = new_data;
424 temp_tup.tuple_size = new_tuple_size;
427 // append returned list to output_queue
428 ftap->output_queue.splice(ftap->output_queue.end(), res);
431 while (!ftap->output_queue.empty()) {
432 host_tuple& tup = ftap->output_queue.front();
434 fprintf(stderr, "HFTA::about to post tuple\n");
436 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
438 ftap->output_queue.pop_front();
446 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
448 // Send a hearbeat message to clearinghouse.to indicate we are alive
449 fta_heartbeat(ftap->ftaid, 0, 0, 0);
458 gs_schemahandle_t schema_handle;
460 vector<operator_node*> sorted_nodes;
462 list<lfta_info*> *lfta_list;
463 /* number of eof tuples we received so far
464 * receiving eof tuples from every source fta will cause a flush
471 list<host_tuple> output_queue;
474 gs_uint32_t in_tuple_cnt;
475 gs_uint32_t out_tuple_cnt;
476 gs_uint32_t out_tuple_sz;
477 gs_uint64_t cycle_cnt;
479 gs_uint64_t trace_id;
481 // memory occupied by output queue
482 gs_uint32_t output_queue_mem;
485 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
486 // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
487 // as the schema handle is already passed during operator creation time.
488 // We also don't need to know the output schema as this information is already embeded
489 // in create_output_tuple method of operators' functor.
493 MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
494 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
504 _fta.ftaid.streamid = (gs_p_t)this;
506 schema_handle = sch_handle;
508 output_queue_mem = 0;
510 // topologically sort the operators in the tree (or DAG)
511 // for DAG we make sure we add the node to the sorted list only once
512 operator_node* current_node;
513 map<operator_node*, int> node_map;
514 vector<operator_node*> node_list;
517 node_list.push_back(root);
522 while (i < node_list.size()) {
523 current_node = node_list[i];
524 if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
525 node_map[current_node->left_child] = num_operators++;
526 node_list.push_back(current_node->left_child);
528 if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
529 node_map[current_node->right_child] = num_operators++;
530 node_list.push_back(current_node->right_child);
536 // build adjacency lists for query DAG
537 list<int>* adj_lists = new list<int>[num_operators];
538 bool* leaf_flags = new bool[num_operators];
539 memset(leaf_flags, 0, num_operators * sizeof(bool));
540 for (i = 0; i < num_operators; ++i) {
541 current_node = node_list[i];
542 if (current_node->left_child) {
543 adj_lists[i].push_back(node_map[current_node->left_child]);
545 if (current_node->right_child && current_node->left_child != current_node->right_child) {
546 adj_lists[i].push_back(node_map[current_node->right_child]);
550 // run topolofical sort
551 bool leaf_found = true;
554 // add all leafs to sorted_nodes
555 for (i = 0; i < num_operators; ++i) {
556 if (!leaf_flags[i] && adj_lists[i].empty()) {
557 leaf_flags[i] = true;
558 sorted_nodes.push_back(node_list[i]);
561 // remove the node from its parents adjecency lists
562 for (int j = 0; j < num_operators; ++j) {
563 list<int>::iterator iter;
564 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
566 adj_lists[j].erase(iter);
578 // set the parameter block for every operator in tree
579 for (i = 0; i < num_operators; ++i)
580 if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
583 fprintf(stderr,"Instantiate FTAs\n");
585 /* extract lfta param block from hfta param block */
586 // NOTE: param_list must line up with lfta_list
587 list<param_block> param_list;
588 get_lfta_params(sz, value, param_list);
589 list<param_block>::iterator iter1;
590 list<lfta_info*>::iterator iter2 = lfta_list->begin();
592 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
593 lfta_info* inf = *iter2;
596 fprintf(stderr,"Instantiate a FTA\n");
599 gs_uint32_t reuse_flag = 2;
601 // we will try to create a new instance of child FTA only if it is parameterized
602 if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
603 (*iter2)->f.streamid = 0; // not interested in existing instances
606 if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
607 fprintf(stderr,"HFTA::error:could instantiate a FTA");
614 //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
616 _fta.stream_subscribed[i]=(*iter2)->f;
618 _fta.stream_subscribed_cnt = i;
622 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
623 _fta.free_fta = MULTOP_HFTA_free_fta;
624 _fta.control_fta = MULTOP_HFTA_control_fta;
625 _fta.accept_packet = MULTOP_HFTA_accept_packet;
626 _fta.clock_fta = MULTOP_HFTA_clock_fta;
628 // init runtime stats
638 list<lfta_info*>::iterator iter;
641 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
645 delete root; // free operators memory
653 list<host_tuple> res;
655 // go through the list of operators in topological order
657 list<host_tuple>::iterator iter;
658 list<host_tuple> temp_output_queue;
660 for (int i = 0; i < num_operators; ++i) {
661 operator_node* node = sorted_nodes[i];
664 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
666 // for trees we can put output tuples directly into parent's input buffer
667 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
669 // consume tuples waiting in your queue
670 for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
671 node->op->accept_tuple(*(iter), current_output_queue);
673 node->op->flush(current_output_queue);
674 node->input_queue.clear();
677 // copy the tuples from output queue into input queues of all parents
678 list<operator_node*>::iterator node_iter;
680 if (!node->parent_list.empty()) {
681 // append the content of the output queue to parent input queue
683 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
685 if (node->parent_list.size() > 1) {
686 ref_cnt = (int*)malloc(sizeof(int));
687 *ref_cnt = node->parent_list.size() - 1;
690 for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
691 (*iter).ref_cnt = ref_cnt;
692 (*node_iter)->input_queue.push_back(*iter);
700 // go through the list of returned tuples and finalyze them
701 list<host_tuple>::iterator iter = res.begin();
702 while (iter != res.end()) {
703 host_tuple& tup = *iter;
705 // finalize the tuple
709 output_queue_mem += tup.tuple_size;
713 // append returned list to output_queue
714 output_queue.splice(output_queue.end(), res);
718 while (!output_queue.empty()) {
719 host_tuple& tup = output_queue.front();
721 fprintf(stderr, "HFTA::about to post tuple\n");
723 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
724 output_queue_mem -= tup.tuple_size;
726 output_queue.pop_front();
735 bool init_failed(){return failed;}
739 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
740 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor
741 // will be called on program exit
744 // free instance we are subscribed to
745 list<lfta_info*>::iterator iter;
748 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
749 fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
757 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
758 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
760 if (command == FTA_COMMAND_FLUSH) {
762 // ask lftas to do the flush
763 list<lfta_info*>::iterator iter;
764 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
765 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
767 // flush hfta operators
770 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
772 list<param_block> param_list;
773 get_lfta_params(sz, value, param_list);
775 // ask lftas to do the flush and set new parameters
776 list<lfta_info*>::iterator iter;
777 list<param_block>::iterator iter2;
778 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
779 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
780 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
783 // flush hfta operators
786 // set the new parameter block for every operator in tree
787 for (int i = 0; i < ftap->num_operators; ++i)
788 ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
790 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
791 // we no longer use temp_status commands
792 // hearbeat mechanism is used instead
797 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
798 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
800 gs_uint64_t start_cycle = rdtsc();
802 fprintf(stderr, "HFTA::accepted packet\n");
804 if (!length) /* ignore null tuples */
807 ftap->in_tuple_cnt++;
810 temp.tuple_size = length;
813 temp.heap_resident = false;
815 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
817 // find from which lfta the tuple came
818 list<lfta_info*>::iterator iter;
819 lfta_info* inf = NULL;
822 for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
823 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&
824 ftap->_fta.stream_subscribed[i].port == ftaid->port &&
825 ftap->_fta.stream_subscribed[i].index == ftaid->index &&
826 ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
833 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
837 // route tuple through operator chain
838 list<host_tuple> result;
842 temp.channel = inf->output_channel;
844 operator_node* current_node = NULL, *child = NULL;
845 list<host_tuple> temp_output_queue;
848 fta_stat* tup_trace = NULL;
849 gs_uint32_t tup_trace_sz = 0;
850 gs_uint64_t trace_id = 0;
851 bool temp_tuple_received = false;
853 // if the tuple is temporal we need to extract the heartbeat payload
854 if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
855 temp_tuple_received = true;
856 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
857 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
860 if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
862 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
865 ftap->num_eof_tuples = 0;
867 /* perform a flush */
870 /* post eof_tuple to a parent */
871 host_tuple eof_tuple;
872 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
874 /* last byte of the tuple specify the tuple type
875 * set it to EOF_TUPLE
877 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
878 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
879 ftap->out_tuple_cnt++;
880 ftap->out_tuple_sz+=eof_tuple.tuple_size;
885 list<host_tuple>::iterator iter2;
889 // push tuple to all parent operators of the lfta
890 list<operator_node*>::iterator node_iter;
891 list<unsigned>::iterator chan_iter;
892 for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
893 temp.channel = *chan_iter;
894 (*node_iter)->input_queue.push_back(temp);
897 for (i = 0; i < ftap->num_operators; ++i) {
899 operator_node* node = ftap->sorted_nodes[i];
900 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
902 // consume tuples waiting in your queue
903 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
904 node->op->accept_tuple(*(iter2), current_output_queue);
906 node->input_queue.clear();
908 // copy the tuples from output queue into input queues of all parents
910 if (!node->parent_list.empty()) {
912 // append the content of the output queue to parent input queue
913 for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
916 if (node->parent_list.size() > 1) {
917 ref_cnt = (int*)malloc(sizeof(int));
918 *ref_cnt = node->parent_list.size() - 1;
921 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
922 (*iter2).ref_cnt = ref_cnt;
923 (*iter2).channel = *chan_iter;
924 (*node_iter)->input_queue.push_back(*iter2);
928 temp_output_queue.clear();
931 current_node = inf->parent;
933 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
934 current_node->input_queue.push_back(temp);
937 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
938 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
940 // consume tuples waiting in your queue
941 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
942 current_node->op->accept_tuple((*iter2),current_output_queue);
944 // All consumed, delete them
945 current_node->input_queue.clear();
946 current_node = current_node->parent;
948 } while (current_node);
954 bool no_temp_tuple = false;
956 // if we received temporal tuple, last tuple of the result must be temporal too
957 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
958 if (temp_tuple_received) {
960 if (result.empty()) {
961 no_temp_tuple = true;
965 temp_tup = result.back();
966 finalize_tuple(temp_tup);
968 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
969 char* new_data = (char*)malloc(new_tuple_size);
970 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
971 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
972 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
975 memset((char*)&stats, 0, sizeof(fta_stat));
976 stats.ftaid = fta->ftaid;
977 stats.in_tuple_cnt = ftap->in_tuple_cnt;
978 stats.out_tuple_cnt = ftap->out_tuple_cnt;
979 stats.out_tuple_sz = ftap->out_tuple_sz;
980 stats.cycle_cnt = ftap->cycle_cnt;
981 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
983 // Send a hearbeat message to clearinghouse.
984 fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
987 ftap->in_tuple_cnt = 0;
988 ftap->out_tuple_cnt = 0;
989 ftap->out_tuple_sz = 0;
993 temp_tup.data = new_data;
994 temp_tup.tuple_size = new_tuple_size;
999 // go through the list of returned tuples and finalyze them
1000 // since we can produce multiple temporal tuples in DAG plans
1001 // we can drop all of them except the last one
1002 iter2 = result.begin();
1003 while(iter2 != result.end()) {
1004 host_tuple tup = *iter2;
1006 // finalize the tuple
1007 if (tup.tuple_size) {
1008 finalize_tuple(tup);
1011 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1016 ftap->output_queue.push_back(tup);
1017 ftap->output_queue_mem += tup.tuple_size;
1024 // append returned list to output_queue
1025 // ftap->output_queue.splice(ftap->output_queue.end(), result);
1027 if (temp_tuple_received && !no_temp_tuple) {
1028 ftap->output_queue.push_back(temp_tup);
1029 ftap->output_queue_mem += temp_tup.tuple_size;
1033 while (!ftap->output_queue.empty()) {
1034 host_tuple& tup = ftap->output_queue.front();
1036 fprintf(stderr, "HFTA::about to post tuple\n");
1038 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1039 ftap->out_tuple_cnt++;
1040 ftap->out_tuple_sz+=tup.tuple_size;
1041 ftap->output_queue_mem -= tup.tuple_size;
1043 ftap->output_queue.pop_front();
1048 ftap->cycle_cnt += rdtsc() - start_cycle;
1053 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1054 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1058 fprintf(stderr, "FTA = %s|", ftap->fta_name);
1059 fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1060 fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1061 fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1062 fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1065 fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1066 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1068 for (int i = 1; i < ftap->num_operators; ++i) {
1069 operator_node* node = ftap->sorted_nodes[i];
1070 fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1071 total_mem += node->op->get_mem_footprint();
1073 fprintf(stderr, ", total = %d|", total_mem );
1074 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1078 memset((char*)&stats, 0, sizeof(fta_stat));
1079 stats.ftaid = fta->ftaid;
1080 stats.in_tuple_cnt = ftap->in_tuple_cnt;
1081 stats.out_tuple_cnt = ftap->out_tuple_cnt;
1082 stats.out_tuple_sz = ftap->out_tuple_sz;
1083 stats.cycle_cnt = ftap->cycle_cnt;
1085 // Send a hearbeat message to clearinghouse.
1086 fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1088 // resets runtime stats
1089 ftap->in_tuple_cnt = 0;
1090 ftap->out_tuple_cnt = 0;
1091 ftap->out_tuple_sz = 0;
1092 ftap->cycle_cnt = 0;