1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
20 #include "host_tuple.h"
21 #include "base_operator.h"
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
31 #define ULLMIN(x,y) (unsigned long long)(((x)<(y)?(x):(y)))
32 #define ULLMAX(x,y) (unsigned long long)(((x)<(y)?(y):(x)))
33 #define LLMIN(x,y) (long long int)(((x)<(y)?(x):(y)))
34 #define LLMAX(x,y) (long long int)(((x)<(y)?(y):(x)))
35 #define UMIN(x,y) (unsigned int)(((x)<(y)?(x):(y)))
36 #define UMAX(x,y) (unsigned int)(((x)<(y)?(y):(x)))
37 #define LMIN(x,y) (int)(((x)<(y)?(x):(y)))
38 #define LMAX(x,y) (int)(((x)<(y)?(y):(x)))
39 #define FMIN(x,y) (double)(((x)<(y)?(x):(y)))
40 #define FMAX(x,y) (double)(((x)<(y)?(y):(x)))
43 #define EQ(x,y) ((x)==(y))
44 #define GEQ(x,y) ((x)>=(y))
45 #define GE(x,y) ((x)>(y))
46 #define LEQ(x,y) ((x)<=(y))
47 #define LE(x,y) ((x)<(y))
50 #define if_else_f(x,y,z) (double)(((x)==0?(z):(y)))
51 #define if_else_ll(x,y,z) (long long int)(((x)==0?(z):(y)))
52 #define if_else_ul(x,y,z) (unsigned long long)(((x)==0?(z):(y)))
53 #define if_else_u(x,y,z) (unsigned int)(((x)==0?(z):(y)))
54 #define if_else_i(x,y,z) (int)(((x)==0?(z):(y)))
56 // Cast away temporality
57 #define non_temporal(x)(x)
60 #define endian_swap_ui(x) ( (( (x) & 0xFF000000) >> 24) | (( (x) & 0x00FF0000) >> 8) | (( (x) & 0x0000FF00) << 8) | (( (x) & 0x000000FF) << 24) )
62 // Access math libraries
63 #define sqrt(x) sqrt(x)
64 #define pow(x,y) pow((x),(y))
68 #define asin(x) asin(x)
69 #define acos(x) acos(x)
70 #define atan(x) atan(x)
72 #define log2(x) log2(x)
73 #define log10(x) log10(x)
74 #define ceil(x) ceil(x)
75 #define floor(x) floor(x)
76 #define fmod(x) fmod(x)
77 #define trunc(x) trunc(x)
85 #include <schemaparser.h>
89 gs_int32_t block_length;
93 // forward declaration of operator_node
97 gs_schemahandle_t schema_handle;
101 list<operator_node*> parent_list;
102 list<unsigned> out_channel_list;
104 operator_node* parent;
105 unsigned output_channel;
123 if (schema_handle >= 0)
124 ftaschema_free(schema_handle);
129 struct operator_node {
133 list<operator_node*> parent_list;
134 list<unsigned> out_channel_list;
136 operator_node* parent;
139 operator_node* left_child;
140 operator_node* right_child;
141 lfta_info* left_lfta;
142 lfta_info* right_lfta;
144 list<host_tuple> input_queue;
146 operator_node(base_operator* o) {
151 left_child = right_child = NULL;
152 left_lfta = right_lfta = NULL;
159 void set_left_child_node(operator_node* child) {
163 child->parent_list.push_back(this);
164 child->out_channel_list.push_back(0);
166 child->parent = this;
167 child->op->set_output_channel(0);
172 void set_right_child_node(operator_node* child) {
176 child->parent_list.push_back(this);
177 child->out_channel_list.push_back(1);
179 child->parent = this;
180 child->op->set_output_channel(1);
185 void set_left_lfta(lfta_info* l_lfta) {
189 left_lfta->parent_list.push_back(this);
190 left_lfta->out_channel_list.push_back(0);
192 left_lfta->parent = this;
193 left_lfta->output_channel = 0;
198 void set_right_lfta(lfta_info* r_lfta) {
202 right_lfta->parent_list.push_back(this);
203 right_lfta->out_channel_list.push_back(1);
205 right_lfta->parent = this;
206 right_lfta->output_channel = 1;
215 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
216 void finalize_tuple(host_tuple &tup);
217 void finalize_tuple(host_tuple &tup);
218 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
219 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
220 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
221 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
223 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
224 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
225 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
226 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
233 gs_schemahandle_t schema_handle;
235 list<host_tuple> output_queue;
237 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
238 // lfta_name and an instance of the base_operator
239 // We don't need to know the output schema as this information is already embeded
240 // in create_output_tuple method of operators' functor.
241 UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
242 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
247 schema_handle = sch_handle;
251 _fta.ftaid.streamid = (gs_p_t)this;
254 fprintf(stderr,"Instantiate a FTA\n");
256 /* extract lfta param block from hfta param block */
257 list<param_block> param_list;
258 get_lfta_params(sz, value, param_list);
259 param_block param = param_list.front();
262 gs_uint32_t reuse_flag = 2;
263 // we will try to create a new instance of child FTA only if it is parameterized
264 if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
265 f.streamid = 0; // not interested in existing instances
269 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
270 fprintf(stderr,"HFTA::error:could instantiate a FTA");
276 // set the operator's parameters
277 if(oper->set_param_block(sz, (void*)value)) failed = true;;
280 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
281 _fta.stream_subscribed_cnt = 1;
282 _fta.stream_subscribed[0] = f;
284 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
285 _fta.free_fta = UNOP_HFTA_free_fta;
286 _fta.control_fta = UNOP_HFTA_control_fta;
287 _fta.accept_packet = UNOP_HFTA_accept_packet;
288 _fta.clock_fta = UNOP_HFTA_clock_fta;
292 delete oper; // free operators memory
297 list<host_tuple> res;
298 if (!oper->flush(res)) {
301 // go through the list of returned tuples and finalyze them
302 list<host_tuple>::iterator iter = res.begin();
303 while (iter != res.end()) {
304 host_tuple& tup = *iter;
306 // finalize the tuple
312 // append returned list to output_queue
313 output_queue.splice(output_queue.end(), res);
317 while (!output_queue.empty()) {
318 host_tuple& tup = output_queue.front();
320 fprintf(stderr, "HFTA::about to post tuple\n");
322 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
324 output_queue.pop_front();
334 bool init_failed(){return failed;}
337 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
338 UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor
339 // will be called on program exit
342 // free instance we are subscribed to
343 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
349 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
350 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
352 if (command == FTA_COMMAND_FLUSH) {
353 // ask lfta to do the flush
354 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
357 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
358 // ask lfta to do the flush
359 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
362 /* extract lfta param block from hfta param block */
363 list<param_block> param_list;
364 get_lfta_params(sz, value, param_list);
365 param_block param = param_list.front();
366 // load new parameters into lfta
367 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
370 // notify the operator about the change of parameter
371 ftap->oper->set_param_block(sz, (void*)value);
373 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
374 // we no longer use temp_status commands
375 // hearbeat mechanism is used instead
380 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
381 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
383 fprintf(stderr, "HFTA::accepted packet\n");
385 if (!length) /* ignore null tuples */
389 temp.tuple_size = length;
392 temp.heap_resident = false;
394 // pass the tuple to operator
395 list<host_tuple> res;
397 fta_stat* tup_trace = NULL;
398 gs_uint32_t tup_trace_sz = 0;
399 gs_uint64_t trace_id = 0;
400 bool temp_tuple_received = false;
403 // if the tuple is temporal we need to extract the hearbeat payload
404 if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
405 temp_tuple_received = true;
406 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
407 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
410 if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
411 /* perform a flush */
414 /* post eof_tuple to a parent */
415 host_tuple eof_tuple;
416 ftap->oper->get_temp_status(eof_tuple);
418 /* last byte of the tuple specifies the tuple type
419 * set it to EOF_TUPLE
421 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
422 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
427 ret = ftap->oper->accept_tuple(temp, res);
429 // go through the list of returned tuples and finalyze them
430 list<host_tuple>::iterator iter = res.begin();
431 while (iter != res.end()) {
432 host_tuple& tup = *iter;
434 // finalize the tuple
440 // if we received temporal tuple, last tuple of the result must be temporal too
441 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
442 if (temp_tuple_received) {
444 host_tuple& temp_tup = res.back();
447 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
448 char* new_data = (char*)malloc(new_tuple_size);
449 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
450 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
451 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
452 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
454 memset((char*)&stats, 0, sizeof(fta_stat));
455 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
457 // Send a hearbeat message to clearinghouse.
458 fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
460 temp_tup.free_tuple();
461 temp_tup.data = new_data;
462 temp_tup.tuple_size = new_tuple_size;
465 // append returned list to output_queue
466 ftap->output_queue.splice(ftap->output_queue.end(), res);
469 while (!ftap->output_queue.empty()) {
470 host_tuple& tup = ftap->output_queue.front();
472 fprintf(stderr, "HFTA::about to post tuple\n");
474 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
476 ftap->output_queue.pop_front();
484 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
486 // Send a hearbeat message to clearinghouse.to indicate we are alive
487 fta_heartbeat(ftap->ftaid, 0, 0, 0);
496 gs_schemahandle_t schema_handle;
498 vector<operator_node*> sorted_nodes;
500 list<lfta_info*> *lfta_list;
501 /* number of eof tuples we received so far
502 * receiving eof tuples from every source fta will cause a flush
509 list<host_tuple> output_queue;
512 gs_uint32_t in_tuple_cnt;
513 gs_uint32_t out_tuple_cnt;
514 gs_uint32_t out_tuple_sz;
515 gs_uint64_t cycle_cnt;
517 gs_uint64_t trace_id;
519 // memory occupied by output queue
520 gs_uint32_t output_queue_mem;
523 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
524 // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
525 // as the schema handle is already passed during operator creation time.
526 // We also don't need to know the output schema as this information is already embeded
527 // in create_output_tuple method of operators' functor.
531 MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
532 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
542 _fta.ftaid.streamid = (gs_p_t)this;
544 schema_handle = sch_handle;
546 output_queue_mem = 0;
548 // topologically sort the operators in the tree (or DAG)
549 // for DAG we make sure we add the node to the sorted list only once
550 operator_node* current_node;
551 map<operator_node*, int> node_map;
552 vector<operator_node*> node_list;
555 node_list.push_back(root);
560 while (i < node_list.size()) {
561 current_node = node_list[i];
562 if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
563 node_map[current_node->left_child] = num_operators++;
564 node_list.push_back(current_node->left_child);
566 if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
567 node_map[current_node->right_child] = num_operators++;
568 node_list.push_back(current_node->right_child);
574 // build adjacency lists for query DAG
575 list<int>* adj_lists = new list<int>[num_operators];
576 bool* leaf_flags = new bool[num_operators];
577 memset(leaf_flags, 0, num_operators * sizeof(bool));
578 for (i = 0; i < num_operators; ++i) {
579 current_node = node_list[i];
580 if (current_node->left_child) {
581 adj_lists[i].push_back(node_map[current_node->left_child]);
583 if (current_node->right_child && current_node->left_child != current_node->right_child) {
584 adj_lists[i].push_back(node_map[current_node->right_child]);
588 // run topolofical sort
589 bool leaf_found = true;
592 // add all leafs to sorted_nodes
593 for (i = 0; i < num_operators; ++i) {
594 if (!leaf_flags[i] && adj_lists[i].empty()) {
595 leaf_flags[i] = true;
596 sorted_nodes.push_back(node_list[i]);
599 // remove the node from its parents adjecency lists
600 for (int j = 0; j < num_operators; ++j) {
601 list<int>::iterator iter;
602 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
604 adj_lists[j].erase(iter);
616 // set the parameter block for every operator in tree
617 for (i = 0; i < num_operators; ++i)
618 if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
621 fprintf(stderr,"Instantiate FTAs\n");
623 /* extract lfta param block from hfta param block */
624 // NOTE: param_list must line up with lfta_list
625 list<param_block> param_list;
626 get_lfta_params(sz, value, param_list);
627 list<param_block>::iterator iter1;
628 list<lfta_info*>::iterator iter2 = lfta_list->begin();
630 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
631 lfta_info* inf = *iter2;
634 fprintf(stderr,"Instantiate a FTA\n");
637 gs_uint32_t reuse_flag = 2;
639 // we will try to create a new instance of child FTA only if it is parameterized
640 if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
641 (*iter2)->f.streamid = 0; // not interested in existing instances
644 if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
645 fprintf(stderr,"HFTA::error:could instantiate a FTA");
652 //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
654 _fta.stream_subscribed[i]=(*iter2)->f;
656 _fta.stream_subscribed_cnt = i;
660 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
661 _fta.free_fta = MULTOP_HFTA_free_fta;
662 _fta.control_fta = MULTOP_HFTA_control_fta;
663 _fta.accept_packet = MULTOP_HFTA_accept_packet;
664 _fta.clock_fta = MULTOP_HFTA_clock_fta;
666 // init runtime stats
676 list<lfta_info*>::iterator iter;
679 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
683 delete root; // free operators memory
691 list<host_tuple> res;
693 // go through the list of operators in topological order
695 list<host_tuple>::iterator iter;
696 list<host_tuple> temp_output_queue;
698 for (int i = 0; i < num_operators; ++i) {
699 operator_node* node = sorted_nodes[i];
702 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
704 // for trees we can put output tuples directly into parent's input buffer
705 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
707 // consume tuples waiting in your queue
708 for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
709 node->op->accept_tuple(*(iter), current_output_queue);
711 node->op->flush(current_output_queue);
712 node->input_queue.clear();
715 // copy the tuples from output queue into input queues of all parents
716 list<operator_node*>::iterator node_iter;
718 if (!node->parent_list.empty()) {
719 // append the content of the output queue to parent input queue
721 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
723 if (node->parent_list.size() > 1) {
724 ref_cnt = (int*)malloc(sizeof(int));
725 *ref_cnt = node->parent_list.size() - 1;
728 for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
729 (*iter).ref_cnt = ref_cnt;
730 (*node_iter)->input_queue.push_back(*iter);
738 // go through the list of returned tuples and finalyze them
739 list<host_tuple>::iterator iter = res.begin();
740 while (iter != res.end()) {
741 host_tuple& tup = *iter;
743 // finalize the tuple
747 output_queue_mem += tup.tuple_size;
751 // append returned list to output_queue
752 output_queue.splice(output_queue.end(), res);
756 while (!output_queue.empty()) {
757 host_tuple& tup = output_queue.front();
759 fprintf(stderr, "HFTA::about to post tuple\n");
761 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
762 output_queue_mem -= tup.tuple_size;
764 output_queue.pop_front();
773 bool init_failed(){return failed;}
777 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
778 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor
779 // will be called on program exit
782 // free instance we are subscribed to
783 list<lfta_info*>::iterator iter;
786 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
787 fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
795 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
796 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
798 if (command == FTA_COMMAND_FLUSH) {
800 // ask lftas to do the flush
801 list<lfta_info*>::iterator iter;
802 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
803 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
805 // flush hfta operators
808 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
810 list<param_block> param_list;
811 get_lfta_params(sz, value, param_list);
813 // ask lftas to do the flush and set new parameters
814 list<lfta_info*>::iterator iter;
815 list<param_block>::iterator iter2;
816 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
817 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
818 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
821 // flush hfta operators
824 // set the new parameter block for every operator in tree
825 for (int i = 0; i < ftap->num_operators; ++i)
826 ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
828 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
829 // we no longer use temp_status commands
830 // hearbeat mechanism is used instead
835 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
836 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
838 gs_uint64_t start_cycle = rdtsc();
840 fprintf(stderr, "HFTA::accepted packet\n");
842 if (!length) /* ignore null tuples */
845 ftap->in_tuple_cnt++;
848 temp.tuple_size = length;
851 temp.heap_resident = false;
853 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
855 // find from which lfta the tuple came
856 list<lfta_info*>::iterator iter;
857 lfta_info* inf = NULL;
860 for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
861 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&
862 ftap->_fta.stream_subscribed[i].port == ftaid->port &&
863 ftap->_fta.stream_subscribed[i].index == ftaid->index &&
864 ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
871 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
875 // route tuple through operator chain
876 list<host_tuple> result;
880 temp.channel = inf->output_channel;
882 operator_node* current_node = NULL, *child = NULL;
883 list<host_tuple> temp_output_queue;
886 fta_stat* tup_trace = NULL;
887 gs_uint32_t tup_trace_sz = 0;
888 gs_uint64_t trace_id = 0;
889 bool temp_tuple_received = false;
891 // if the tuple is temporal we need to extract the heartbeat payload
892 if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
893 temp_tuple_received = true;
894 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
895 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
898 if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
900 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
903 ftap->num_eof_tuples = 0;
905 /* perform a flush */
908 /* post eof_tuple to a parent */
909 host_tuple eof_tuple;
910 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
912 /* last byte of the tuple specify the tuple type
913 * set it to EOF_TUPLE
915 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
916 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
917 ftap->out_tuple_cnt++;
918 ftap->out_tuple_sz+=eof_tuple.tuple_size;
923 list<host_tuple>::iterator iter2;
927 // push tuple to all parent operators of the lfta
928 list<operator_node*>::iterator node_iter;
929 list<unsigned>::iterator chan_iter;
930 for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
931 temp.channel = *chan_iter;
932 (*node_iter)->input_queue.push_back(temp);
935 for (i = 0; i < ftap->num_operators; ++i) {
937 operator_node* node = ftap->sorted_nodes[i];
938 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
940 // consume tuples waiting in your queue
941 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
942 node->op->accept_tuple(*(iter2), current_output_queue);
944 node->input_queue.clear();
946 // copy the tuples from output queue into input queues of all parents
948 if (!node->parent_list.empty()) {
950 // append the content of the output queue to parent input queue
951 for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
954 if (node->parent_list.size() > 1) {
955 ref_cnt = (int*)malloc(sizeof(int));
956 *ref_cnt = node->parent_list.size() - 1;
959 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
960 (*iter2).ref_cnt = ref_cnt;
961 (*iter2).channel = *chan_iter;
962 (*node_iter)->input_queue.push_back(*iter2);
966 temp_output_queue.clear();
969 current_node = inf->parent;
971 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
972 current_node->input_queue.push_back(temp);
975 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
976 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
978 // consume tuples waiting in your queue
979 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
980 current_node->op->accept_tuple((*iter2),current_output_queue);
982 // All consumed, delete them
983 current_node->input_queue.clear();
984 current_node = current_node->parent;
986 } while (current_node);
992 bool no_temp_tuple = false;
994 // if we received temporal tuple, last tuple of the result must be temporal too
995 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
996 if (temp_tuple_received) {
998 if (result.empty()) {
999 no_temp_tuple = true;
1003 temp_tup = result.back();
1004 finalize_tuple(temp_tup);
1006 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
1007 char* new_data = (char*)malloc(new_tuple_size);
1008 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
1009 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
1010 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
1013 memset((char*)&stats, 0, sizeof(fta_stat));
1014 stats.ftaid = fta->ftaid;
1015 stats.in_tuple_cnt = ftap->in_tuple_cnt;
1016 stats.out_tuple_cnt = ftap->out_tuple_cnt;
1017 stats.out_tuple_sz = ftap->out_tuple_sz;
1018 stats.cycle_cnt = ftap->cycle_cnt;
1019 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
1021 // Send a hearbeat message to clearinghouse.
1022 fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
1025 ftap->in_tuple_cnt = 0;
1026 ftap->out_tuple_cnt = 0;
1027 ftap->out_tuple_sz = 0;
1028 ftap->cycle_cnt = 0;
1030 free(temp_tup.data);
1031 temp_tup.data = new_data;
1032 temp_tup.tuple_size = new_tuple_size;
1037 // go through the list of returned tuples and finalyze them
1038 // since we can produce multiple temporal tuples in DAG plans
1039 // we can drop all of them except the last one
1040 iter2 = result.begin();
1041 while(iter2 != result.end()) {
1042 host_tuple tup = *iter2;
1044 // finalize the tuple
1045 if (tup.tuple_size) {
1046 finalize_tuple(tup);
1049 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1054 ftap->output_queue.push_back(tup);
1055 ftap->output_queue_mem += tup.tuple_size;
1062 // append returned list to output_queue
1063 // ftap->output_queue.splice(ftap->output_queue.end(), result);
1065 if (temp_tuple_received && !no_temp_tuple) {
1066 ftap->output_queue.push_back(temp_tup);
1067 ftap->output_queue_mem += temp_tup.tuple_size;
1071 while (!ftap->output_queue.empty()) {
1072 host_tuple& tup = ftap->output_queue.front();
1074 fprintf(stderr, "HFTA::about to post tuple\n");
1076 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1077 ftap->out_tuple_cnt++;
1078 ftap->out_tuple_sz+=tup.tuple_size;
1079 ftap->output_queue_mem -= tup.tuple_size;
1081 ftap->output_queue.pop_front();
1086 ftap->cycle_cnt += rdtsc() - start_cycle;
1091 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1092 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1096 fprintf(stderr, "FTA = %s|", ftap->fta_name);
1097 fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1098 fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1099 fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1100 fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1103 fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1104 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1106 for (int i = 1; i < ftap->num_operators; ++i) {
1107 operator_node* node = ftap->sorted_nodes[i];
1108 fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1109 total_mem += node->op->get_mem_footprint();
1111 fprintf(stderr, ", total = %d|", total_mem );
1112 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1116 memset((char*)&stats, 0, sizeof(fta_stat));
1117 stats.ftaid = fta->ftaid;
1118 stats.in_tuple_cnt = ftap->in_tuple_cnt;
1119 stats.out_tuple_cnt = ftap->out_tuple_cnt;
1120 stats.out_tuple_sz = ftap->out_tuple_sz;
1121 stats.cycle_cnt = ftap->cycle_cnt;
1123 // Send a hearbeat message to clearinghouse.
1124 fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1126 // resets runtime stats
1127 ftap->in_tuple_cnt = 0;
1128 ftap->out_tuple_cnt = 0;
1129 ftap->out_tuple_sz = 0;
1130 ftap->cycle_cnt = 0;