1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
20 #include "host_tuple.h"
21 #include "base_operator.h"
27 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
28 #define LLMIN(x,y) ((x)<(y)?(x):(y))
29 #define LLMAX(x,y) ((x)<(y)?(y):(x))
30 #define UMIN(x,y) ((x)<(y)?(x):(y))
31 #define UMAX(x,y) ((x)<(y)?(y):(x))
32 #define EQ(x,y) ((x)==(y))
33 #define GEQ(x,y) ((x)>=(y))
34 #define LEQ(x,y) ((x)<=(y))
35 // Cast away temporality
36 #define non_temporal(x)(x)
45 #include <schemaparser.h>
49 gs_int32_t block_length;
53 // forward declaration of operator_node
57 gs_schemahandle_t schema_handle;
61 list<operator_node*> parent_list;
62 list<unsigned> out_channel_list;
64 operator_node* parent;
65 unsigned output_channel;
83 if (schema_handle >= 0)
84 ftaschema_free(schema_handle);
89 struct operator_node {
93 list<operator_node*> parent_list;
94 list<unsigned> out_channel_list;
96 operator_node* parent;
99 operator_node* left_child;
100 operator_node* right_child;
101 lfta_info* left_lfta;
102 lfta_info* right_lfta;
104 list<host_tuple> input_queue;
106 operator_node(base_operator* o) {
111 left_child = right_child = NULL;
112 left_lfta = right_lfta = NULL;
119 void set_left_child_node(operator_node* child) {
123 child->parent_list.push_back(this);
124 child->out_channel_list.push_back(0);
126 child->parent = this;
127 child->op->set_output_channel(0);
132 void set_right_child_node(operator_node* child) {
136 child->parent_list.push_back(this);
137 child->out_channel_list.push_back(1);
139 child->parent = this;
140 child->op->set_output_channel(1);
145 void set_left_lfta(lfta_info* l_lfta) {
149 left_lfta->parent_list.push_back(this);
150 left_lfta->out_channel_list.push_back(0);
152 left_lfta->parent = this;
153 left_lfta->output_channel = 0;
158 void set_right_lfta(lfta_info* r_lfta) {
162 right_lfta->parent_list.push_back(this);
163 right_lfta->out_channel_list.push_back(1);
165 right_lfta->parent = this;
166 right_lfta->output_channel = 1;
175 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
176 void finalize_tuple(host_tuple &tup);
177 void finalize_tuple(host_tuple &tup);
178 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
179 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
180 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
181 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
183 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
184 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
185 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
186 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
193 gs_schemahandle_t schema_handle;
195 list<host_tuple> output_queue;
197 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
198 // lfta_name and an instance of the base_operator
199 // We don't need to know the output schema as this information is already embeded
200 // in create_output_tuple method of operators' functor.
201 UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
202 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
207 schema_handle = sch_handle;
211 _fta.ftaid.streamid = (gs_p_t)this;
214 fprintf(stderr,"Instantiate a FTA\n");
216 /* extract lfta param block from hfta param block */
217 list<param_block> param_list;
218 get_lfta_params(sz, value, param_list);
219 param_block param = param_list.front();
222 gs_uint32_t reuse_flag = 2;
223 // we will try to create a new instance of child FTA only if it is parameterized
224 if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
225 f.streamid = 0; // not interested in existing instances
229 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
230 fprintf(stderr,"HFTA::error:could instantiate a FTA");
236 // set the operator's parameters
237 if(oper->set_param_block(sz, (void*)value)) failed = true;;
240 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
241 _fta.stream_subscribed_cnt = 1;
242 _fta.stream_subscribed[0] = f;
244 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
245 _fta.free_fta = UNOP_HFTA_free_fta;
246 _fta.control_fta = UNOP_HFTA_control_fta;
247 _fta.accept_packet = UNOP_HFTA_accept_packet;
248 _fta.clock_fta = UNOP_HFTA_clock_fta;
252 delete oper; // free operators memory
257 list<host_tuple> res;
258 if (!oper->flush(res)) {
261 // go through the list of returned tuples and finalyze them
262 list<host_tuple>::iterator iter = res.begin();
263 while (iter != res.end()) {
264 host_tuple& tup = *iter;
266 // finalize the tuple
272 // append returned list to output_queue
273 output_queue.splice(output_queue.end(), res);
277 while (!output_queue.empty()) {
278 host_tuple& tup = output_queue.front();
280 fprintf(stderr, "HFTA::about to post tuple\n");
282 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
284 output_queue.pop_front();
294 bool init_failed(){return failed;}
297 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
298 UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor
299 // will be called on program exit
302 // free instance we are subscribed to
303 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
309 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
310 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
312 if (command == FTA_COMMAND_FLUSH) {
313 // ask lfta to do the flush
314 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
317 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
318 // ask lfta to do the flush
319 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
322 /* extract lfta param block from hfta param block */
323 list<param_block> param_list;
324 get_lfta_params(sz, value, param_list);
325 param_block param = param_list.front();
326 // load new parameters into lfta
327 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
330 // notify the operator about the change of parameter
331 ftap->oper->set_param_block(sz, (void*)value);
333 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
334 // we no longer use temp_status commands
335 // hearbeat mechanism is used instead
340 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
341 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
343 fprintf(stderr, "HFTA::accepted packet\n");
345 if (!length) /* ignore null tuples */
349 temp.tuple_size = length;
352 temp.heap_resident = false;
354 // pass the tuple to operator
355 list<host_tuple> res;
357 fta_stat* tup_trace = NULL;
358 gs_uint32_t tup_trace_sz = 0;
359 gs_uint64_t trace_id = 0;
360 bool temp_tuple_received = false;
363 // if the tuple is temporal we need to extract the hearbeat payload
364 if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
365 temp_tuple_received = true;
366 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
367 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
370 if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
371 /* perform a flush */
374 /* post eof_tuple to a parent */
375 host_tuple eof_tuple;
376 ftap->oper->get_temp_status(eof_tuple);
378 /* last byte of the tuple specifies the tuple type
379 * set it to EOF_TUPLE
381 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
382 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
387 ret = ftap->oper->accept_tuple(temp, res);
389 // go through the list of returned tuples and finalyze them
390 list<host_tuple>::iterator iter = res.begin();
391 while (iter != res.end()) {
392 host_tuple& tup = *iter;
394 // finalize the tuple
400 // if we received temporal tuple, last tuple of the result must be temporal too
401 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
402 if (temp_tuple_received) {
404 host_tuple& temp_tup = res.back();
407 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
408 char* new_data = (char*)malloc(new_tuple_size);
409 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
410 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
411 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
412 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
414 memset((char*)&stats, 0, sizeof(fta_stat));
415 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
417 // Send a hearbeat message to clearinghouse.
418 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
420 temp_tup.free_tuple();
421 temp_tup.data = new_data;
422 temp_tup.tuple_size = new_tuple_size;
425 // append returned list to output_queue
426 ftap->output_queue.splice(ftap->output_queue.end(), res);
429 while (!ftap->output_queue.empty()) {
430 host_tuple& tup = ftap->output_queue.front();
432 fprintf(stderr, "HFTA::about to post tuple\n");
434 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
436 ftap->output_queue.pop_front();
444 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
446 // Send a hearbeat message to clearinghouse.to indicate we are alive
447 fta_heartbeat(ftap->ftaid, 0, 0, 0);
456 gs_schemahandle_t schema_handle;
458 vector<operator_node*> sorted_nodes;
460 list<lfta_info*> *lfta_list;
461 /* number of eof tuples we received so far
462 * receiving eof tuples from every source fta will cause a flush
469 list<host_tuple> output_queue;
472 gs_uint32_t in_tuple_cnt;
473 gs_uint32_t out_tuple_cnt;
474 gs_uint32_t out_tuple_sz;
475 gs_uint64_t cycle_cnt;
477 gs_uint64_t trace_id;
479 // memory occupied by output queue
480 gs_uint32_t output_queue_mem;
483 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
484 // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
485 // as the schema handle is already passed during operator creation time.
486 // We also don't need to know the output schema as this information is already embeded
487 // in create_output_tuple method of operators' functor.
491 MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
492 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
502 _fta.ftaid.streamid = (gs_p_t)this;
504 schema_handle = sch_handle;
506 output_queue_mem = 0;
508 // topologically sort the operators in the tree (or DAG)
509 // for DAG we make sure we add the node to the sorted list only once
510 operator_node* current_node;
511 map<operator_node*, int> node_map;
512 vector<operator_node*> node_list;
515 node_list.push_back(root);
520 while (i < node_list.size()) {
521 current_node = node_list[i];
522 if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
523 node_map[current_node->left_child] = num_operators++;
524 node_list.push_back(current_node->left_child);
526 if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
527 node_map[current_node->right_child] = num_operators++;
528 node_list.push_back(current_node->right_child);
534 // build adjacency lists for query DAG
535 list<int>* adj_lists = new list<int>[num_operators];
536 bool* leaf_flags = new bool[num_operators];
537 memset(leaf_flags, 0, num_operators * sizeof(bool));
538 for (i = 0; i < num_operators; ++i) {
539 current_node = node_list[i];
540 if (current_node->left_child) {
541 adj_lists[i].push_back(node_map[current_node->left_child]);
543 if (current_node->right_child && current_node->left_child != current_node->right_child) {
544 adj_lists[i].push_back(node_map[current_node->right_child]);
548 // run topolofical sort
549 bool leaf_found = true;
552 // add all leafs to sorted_nodes
553 for (i = 0; i < num_operators; ++i) {
554 if (!leaf_flags[i] && adj_lists[i].empty()) {
555 leaf_flags[i] = true;
556 sorted_nodes.push_back(node_list[i]);
559 // remove the node from its parents adjecency lists
560 for (int j = 0; j < num_operators; ++j) {
561 list<int>::iterator iter;
562 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
564 adj_lists[j].erase(iter);
576 // set the parameter block for every operator in tree
577 for (i = 0; i < num_operators; ++i)
578 if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
581 fprintf(stderr,"Instantiate FTAs\n");
583 /* extract lfta param block from hfta param block */
584 // NOTE: param_list must line up with lfta_list
585 list<param_block> param_list;
586 get_lfta_params(sz, value, param_list);
587 list<param_block>::iterator iter1;
588 list<lfta_info*>::iterator iter2 = lfta_list->begin();
590 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
591 lfta_info* inf = *iter2;
594 fprintf(stderr,"Instantiate a FTA\n");
597 gs_uint32_t reuse_flag = 2;
599 // we will try to create a new instance of child FTA only if it is parameterized
600 if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
601 (*iter2)->f.streamid = 0; // not interested in existing instances
604 if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
605 fprintf(stderr,"HFTA::error:could instantiate a FTA");
612 //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
614 _fta.stream_subscribed[i]=(*iter2)->f;
616 _fta.stream_subscribed_cnt = i;
620 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
621 _fta.free_fta = MULTOP_HFTA_free_fta;
622 _fta.control_fta = MULTOP_HFTA_control_fta;
623 _fta.accept_packet = MULTOP_HFTA_accept_packet;
624 _fta.clock_fta = MULTOP_HFTA_clock_fta;
626 // init runtime stats
636 list<lfta_info*>::iterator iter;
639 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
643 delete root; // free operators memory
651 list<host_tuple> res;
653 // go through the list of operators in topological order
655 list<host_tuple>::iterator iter;
656 list<host_tuple> temp_output_queue;
658 for (int i = 0; i < num_operators; ++i) {
659 operator_node* node = sorted_nodes[i];
662 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
664 // for trees we can put output tuples directly into parent's input buffer
665 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
667 // consume tuples waiting in your queue
668 for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
669 node->op->accept_tuple(*(iter), current_output_queue);
671 node->op->flush(current_output_queue);
672 node->input_queue.clear();
675 // copy the tuples from output queue into input queues of all parents
676 list<operator_node*>::iterator node_iter;
678 if (!node->parent_list.empty()) {
679 // append the content of the output queue to parent input queue
681 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
683 if (node->parent_list.size() > 1) {
684 ref_cnt = (int*)malloc(sizeof(int));
685 *ref_cnt = node->parent_list.size() - 1;
688 for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
689 (*iter).ref_cnt = ref_cnt;
690 (*node_iter)->input_queue.push_back(*iter);
698 // go through the list of returned tuples and finalyze them
699 list<host_tuple>::iterator iter = res.begin();
700 while (iter != res.end()) {
701 host_tuple& tup = *iter;
703 // finalize the tuple
707 output_queue_mem += tup.tuple_size;
711 // append returned list to output_queue
712 output_queue.splice(output_queue.end(), res);
716 while (!output_queue.empty()) {
717 host_tuple& tup = output_queue.front();
719 fprintf(stderr, "HFTA::about to post tuple\n");
721 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
722 output_queue_mem -= tup.tuple_size;
724 output_queue.pop_front();
733 bool init_failed(){return failed;}
737 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
738 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor
739 // will be called on program exit
742 // free instance we are subscribed to
743 list<lfta_info*>::iterator iter;
746 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
747 fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
755 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
756 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
758 if (command == FTA_COMMAND_FLUSH) {
760 // ask lftas to do the flush
761 list<lfta_info*>::iterator iter;
762 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
763 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
765 // flush hfta operators
768 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
770 list<param_block> param_list;
771 get_lfta_params(sz, value, param_list);
773 // ask lftas to do the flush and set new parameters
774 list<lfta_info*>::iterator iter;
775 list<param_block>::iterator iter2;
776 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
777 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
778 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
781 // flush hfta operators
784 // set the new parameter block for every operator in tree
785 for (int i = 0; i < ftap->num_operators; ++i)
786 ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
788 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
789 // we no longer use temp_status commands
790 // hearbeat mechanism is used instead
795 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
796 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
798 gs_uint64_t start_cycle = rdtsc();
800 fprintf(stderr, "HFTA::accepted packet\n");
802 if (!length) /* ignore null tuples */
805 ftap->in_tuple_cnt++;
808 temp.tuple_size = length;
811 temp.heap_resident = false;
813 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
815 // find from which lfta the tuple came
816 list<lfta_info*>::iterator iter;
817 lfta_info* inf = NULL;
820 for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
821 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&
822 ftap->_fta.stream_subscribed[i].port == ftaid->port &&
823 ftap->_fta.stream_subscribed[i].index == ftaid->index &&
824 ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
831 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
835 // route tuple through operator chain
836 list<host_tuple> result;
840 temp.channel = inf->output_channel;
842 operator_node* current_node = NULL, *child = NULL;
843 list<host_tuple> temp_output_queue;
846 fta_stat* tup_trace = NULL;
847 gs_uint32_t tup_trace_sz = 0;
848 gs_uint64_t trace_id = 0;
849 bool temp_tuple_received = false;
851 // if the tuple is temporal we need to extract the heartbeat payload
852 if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
853 temp_tuple_received = true;
854 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
855 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
858 if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
860 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
863 ftap->num_eof_tuples = 0;
865 /* perform a flush */
868 /* post eof_tuple to a parent */
869 host_tuple eof_tuple;
870 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
872 /* last byte of the tuple specify the tuple type
873 * set it to EOF_TUPLE
875 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
876 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
877 ftap->out_tuple_cnt++;
878 ftap->out_tuple_sz+=eof_tuple.tuple_size;
883 list<host_tuple>::iterator iter2;
887 // push tuple to all parent operators of the lfta
888 list<operator_node*>::iterator node_iter;
889 list<unsigned>::iterator chan_iter;
890 for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
891 temp.channel = *chan_iter;
892 (*node_iter)->input_queue.push_back(temp);
895 for (i = 0; i < ftap->num_operators; ++i) {
897 operator_node* node = ftap->sorted_nodes[i];
898 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
900 // consume tuples waiting in your queue
901 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
902 node->op->accept_tuple(*(iter2), current_output_queue);
904 node->input_queue.clear();
906 // copy the tuples from output queue into input queues of all parents
908 if (!node->parent_list.empty()) {
910 // append the content of the output queue to parent input queue
911 for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
914 if (node->parent_list.size() > 1) {
915 ref_cnt = (int*)malloc(sizeof(int));
916 *ref_cnt = node->parent_list.size() - 1;
919 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
920 (*iter2).ref_cnt = ref_cnt;
921 (*iter2).channel = *chan_iter;
922 (*node_iter)->input_queue.push_back(*iter2);
926 temp_output_queue.clear();
929 current_node = inf->parent;
931 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
932 current_node->input_queue.push_back(temp);
935 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
936 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
938 // consume tuples waiting in your queue
939 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
940 current_node->op->accept_tuple((*iter2),current_output_queue);
942 // All consumed, delete them
943 current_node->input_queue.clear();
944 current_node = current_node->parent;
946 } while (current_node);
952 bool no_temp_tuple = false;
954 // if we received temporal tuple, last tuple of the result must be temporal too
955 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
956 if (temp_tuple_received) {
958 if (result.empty()) {
959 no_temp_tuple = true;
963 temp_tup = result.back();
964 finalize_tuple(temp_tup);
966 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
967 char* new_data = (char*)malloc(new_tuple_size);
968 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
969 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
970 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
973 memset((char*)&stats, 0, sizeof(fta_stat));
974 stats.ftaid = fta->ftaid;
975 stats.in_tuple_cnt = ftap->in_tuple_cnt;
976 stats.out_tuple_cnt = ftap->out_tuple_cnt;
977 stats.out_tuple_sz = ftap->out_tuple_sz;
978 stats.cycle_cnt = ftap->cycle_cnt;
979 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
981 // Send a hearbeat message to clearinghouse.
982 fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
985 ftap->in_tuple_cnt = 0;
986 ftap->out_tuple_cnt = 0;
987 ftap->out_tuple_sz = 0;
991 temp_tup.data = new_data;
992 temp_tup.tuple_size = new_tuple_size;
997 // go through the list of returned tuples and finalyze them
998 // since we can produce multiple temporal tuples in DAG plans
999 // we can drop all of them except the last one
1000 iter2 = result.begin();
1001 while(iter2 != result.end()) {
1002 host_tuple tup = *iter2;
1004 // finalize the tuple
1005 if (tup.tuple_size) {
1006 finalize_tuple(tup);
1009 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1014 ftap->output_queue.push_back(tup);
1015 ftap->output_queue_mem += tup.tuple_size;
1022 // append returned list to output_queue
1023 // ftap->output_queue.splice(ftap->output_queue.end(), result);
1025 if (temp_tuple_received && !no_temp_tuple) {
1026 ftap->output_queue.push_back(temp_tup);
1027 ftap->output_queue_mem += temp_tup.tuple_size;
1031 while (!ftap->output_queue.empty()) {
1032 host_tuple& tup = ftap->output_queue.front();
1034 fprintf(stderr, "HFTA::about to post tuple\n");
1036 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1037 ftap->out_tuple_cnt++;
1038 ftap->out_tuple_sz+=tup.tuple_size;
1039 ftap->output_queue_mem -= tup.tuple_size;
1041 ftap->output_queue.pop_front();
1046 ftap->cycle_cnt += rdtsc() - start_cycle;
1051 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1052 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1056 fprintf(stderr, "FTA = %s|", ftap->fta_name);
1057 fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1058 fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1059 fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1060 fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1063 fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1064 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1066 for (int i = 1; i < ftap->num_operators; ++i) {
1067 operator_node* node = ftap->sorted_nodes[i];
1068 fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1069 total_mem += node->op->get_mem_footprint();
1071 fprintf(stderr, ", total = %d|", total_mem );
1072 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1076 memset((char*)&stats, 0, sizeof(fta_stat));
1077 stats.ftaid = fta->ftaid;
1078 stats.in_tuple_cnt = ftap->in_tuple_cnt;
1079 stats.out_tuple_cnt = ftap->out_tuple_cnt;
1080 stats.out_tuple_sz = ftap->out_tuple_sz;
1081 stats.cycle_cnt = ftap->cycle_cnt;
1083 // Send a hearbeat message to clearinghouse.
1084 fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1086 // resets runtime stats
1087 ftap->in_tuple_cnt = 0;
1088 ftap->out_tuple_cnt = 0;
1089 ftap->out_tuple_sz = 0;
1090 ftap->cycle_cnt = 0;