1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
20 #include "host_tuple.h"
21 #include "base_operator.h"
28 #define hfta_ullong_hashfunc(x) (((int)(*(x)))^((int)((*(x))>>32)))
31 #define ULLMIN(x,y) (unsigned long long)(((x)<(y)?(x):(y)))
32 #define ULLMAX(x,y) (unsigned long long)(((x)<(y)?(y):(x)))
33 #define LLMIN(x,y) (long long int)(((x)<(y)?(x):(y)))
34 #define LLMAX(x,y) (long long int)(((x)<(y)?(y):(x)))
35 #define UMIN(x,y) (unsigned int)(((x)<(y)?(x):(y)))
36 #define UMAX(x,y) (unsigned int)(((x)<(y)?(y):(x)))
37 #define LMIN(x,y) (int)(((x)<(y)?(x):(y)))
38 #define LMAX(x,y) (int)(((x)<(y)?(y):(x)))
39 #define FMIN(x,y) (double)(((x)<(y)?(x):(y)))
40 #define FMAX(x,y) (double)(((x)<(y)?(y):(x)))
43 #define EQ(x,y) ((x)==(y))
44 #define GEQ(x,y) ((x)>=(y))
45 #define GE(x,y) ((x)>(y))
46 #define LEQ(x,y) ((x)<=(y))
47 #define LE(x,y) ((x)<(y))
50 #define if_else_f(x,y,z) (double)(((x)==0?(z):(y)))
51 #define if_else_ll(x,y,z) (long long int)(((x)==0?(z):(y)))
52 #define if_else_ul(x,y,z) (unsigned long long)(((x)==0?(z):(y)))
53 #define if_else_u(x,y,z) (unsigned int)(((x)==0?(z):(y)))
54 #define if_else_i(x,y,z) (int)(((x)==0?(z):(y)))
56 // Cast away temporality
57 #define non_temporal(x)(x)
60 #define endian_swap_ui(x) ( (( (x) & 0xFF000000) >> 24) | (( (x) & 0x00FF0000) >> 8) | (( (x) & 0x0000FF00) << 8) | (( (x) & 0x000000FF) << 24) )
62 // Access math libraries
63 #define sqrt(x) sqrt(x)
64 #define pow(x,y) pow((x),(y))
68 #define asin(x) asin(x)
69 #define acos(x) acos(x)
70 #define atan(x) atan(x)
72 #define log2(x) log2(x)
73 #define log10(x) log10(x)
74 #define ceil(x) ceil(x)
75 #define floor(x) floor(x)
76 #define fmod(x) fmod(x)
77 #define trunc(x) trunc(x)
85 #include <schemaparser.h>
89 gs_int32_t block_length;
93 // forward declaration of operator_node
97 gs_schemahandle_t schema_handle;
101 list<operator_node*> parent_list;
102 list<unsigned> out_channel_list;
104 operator_node* parent;
105 unsigned output_channel;
123 if (schema_handle >= 0)
124 ftaschema_free(schema_handle);
129 struct operator_node {
133 list<operator_node*> parent_list;
134 list<unsigned> out_channel_list;
136 operator_node* parent;
139 operator_node* left_child;
140 operator_node* right_child;
141 lfta_info* left_lfta;
142 lfta_info* right_lfta;
144 list<host_tuple> input_queue;
146 operator_node(base_operator* o) {
151 left_child = right_child = NULL;
152 left_lfta = right_lfta = NULL;
159 void set_left_child_node(operator_node* child) {
163 child->parent_list.push_back(this);
164 child->out_channel_list.push_back(0);
166 child->parent = this;
167 child->op->set_output_channel(0);
172 void set_right_child_node(operator_node* child) {
176 child->parent_list.push_back(this);
177 child->out_channel_list.push_back(1);
179 child->parent = this;
180 child->op->set_output_channel(1);
185 void set_left_lfta(lfta_info* l_lfta) {
189 left_lfta->parent_list.push_back(this);
190 left_lfta->out_channel_list.push_back(0);
192 left_lfta->parent = this;
193 left_lfta->output_channel = 0;
198 void set_right_lfta(lfta_info* r_lfta) {
202 right_lfta->parent_list.push_back(this);
203 right_lfta->out_channel_list.push_back(1);
205 right_lfta->parent = this;
206 right_lfta->output_channel = 1;
215 int get_lfta_params(gs_int32_t sz, void * value, list<param_block>& lst);
216 void finalize_tuple(host_tuple &tup);
217 void finalize_tuple(host_tuple &tup);
218 gs_retval_t UNOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
219 gs_retval_t UNOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
220 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
221 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap);
223 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *ftap, gs_uint32_t recursive);
224 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *ftap, gs_int32_t command, gs_int32_t sz, void * value);
225 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *ftap, FTAID *ftaid, void * packet, gs_int32_t length);
226 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *ftap);
233 gs_schemahandle_t schema_handle;
235 list<host_tuple> output_queue;
237 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
238 // lfta_name and an instance of the base_operator
239 // We don't need to know the output schema as this information is already embeded
240 // in create_output_tuple method of operators' functor.
241 UNOP_HFTA(struct FTAID ftaid, FTAID child_ftaid, gs_int32_t command, gs_int32_t sz, void * value, base_operator* op,
242 gs_csp_t fta_name, char* schema, gs_schemahandle_t sch_handle, bool fta_reusable, gs_uint32_t reuse_option) {
247 schema_handle = sch_handle;
251 _fta.ftaid.streamid = (gs_p_t)this;
254 fprintf(stderr,"Instantiate a FTA\n");
256 /* extract lfta param block from hfta param block */
257 list<param_block> param_list;
258 get_lfta_params(sz, value, param_list);
259 param_block param = param_list.front();
262 gs_uint32_t reuse_flag = 2;
263 // we will try to create a new instance of child FTA only if it is parameterized
264 if (param.block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
265 f.streamid = 0; // not interested in existing instances
269 if ((fta_alloc_instance(_fta.ftaid, &f,fta_name,schema,reuse_flag, FTA_COMMAND_LOAD_PARAMS,param.block_length,param.data))!=0) {
270 fprintf(stderr,"HFTA::error:could instantiate a FTA");
276 // set the operator's parameters
277 if(oper->set_param_block(sz, (void*)value)) failed = true;;
280 fprintf(stderr,"HFTA::Low level FTA (%s) instanciation done\n", fta_name);
281 _fta.stream_subscribed_cnt = 1;
282 _fta.stream_subscribed[0] = f;
284 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
285 _fta.free_fta = UNOP_HFTA_free_fta;
286 _fta.control_fta = UNOP_HFTA_control_fta;
287 _fta.accept_packet = UNOP_HFTA_accept_packet;
288 _fta.clock_fta = UNOP_HFTA_clock_fta;
292 delete oper; // free operators memory
297 list<host_tuple> res;
298 if (!oper->flush(res)) {
301 // go through the list of returned tuples and finalyze them
302 list<host_tuple>::iterator iter = res.begin();
303 while (iter != res.end()) {
304 host_tuple& tup = *iter;
306 // finalize the tuple
312 // append returned list to output_queue
313 output_queue.splice(output_queue.end(), res);
317 while (!output_queue.empty()) {
318 host_tuple& tup = output_queue.front();
320 fprintf(stderr, "HFTA::about to post tuple\n");
322 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
324 output_queue.pop_front();
334 bool init_failed(){return failed;}
337 gs_retval_t UNOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
338 UNOP_HFTA* ftap = (UNOP_HFTA*)fta; // deallocate the fta and call the destructor
339 // will be called on program exit
342 // free instance we are subscribed to
343 fta_free_instance(gscpipc_getftaid(), ftap->f, recursive);
349 gs_retval_t UNOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
350 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
352 if (command == FTA_COMMAND_FLUSH) {
353 // ask lfta to do the flush
354 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
357 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
358 // ask lfta to do the flush
359 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_FLUSH, sz, value);
362 /* extract lfta param block from hfta param block */
363 list<param_block> param_list;
364 get_lfta_params(sz, value, param_list);
365 param_block param = param_list.front();
366 // load new parameters into lfta
367 fta_control(gscpipc_getftaid(), ftap->f, FTA_COMMAND_LOAD_PARAMS, param.block_length,param.data);
370 // notify the operator about the change of parameter
371 ftap->oper->set_param_block(sz, (void*)value);
373 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
374 // we no longer use temp_status commands
375 // hearbeat mechanism is used instead
380 gs_retval_t UNOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
381 UNOP_HFTA* ftap = (UNOP_HFTA*)fta;
383 fprintf(stderr, "HFTA::accepted packet\n");
385 if (!length) /* ignore null tuples */
389 temp.tuple_size = length;
392 temp.heap_resident = false;
394 // pass the tuple to operator
395 list<host_tuple> res;
397 fta_stat* tup_trace = NULL;
398 gs_uint32_t tup_trace_sz = 0;
399 gs_uint64_t trace_id = 0;
400 bool temp_tuple_received = false;
403 // if the tuple is temporal we need to extract the hearbeat payload
404 if (ftaschema_is_temporal_tuple(ftap->schema_handle, packet)) {
405 temp_tuple_received = true;
406 if (ftaschema_get_trace(ftap->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
407 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
410 if (ftaschema_is_eof_tuple(ftap->schema_handle, packet)) {
411 /* perform a flush */
414 /* post eof_tuple to a parent */
415 host_tuple eof_tuple;
416 ftap->oper->get_temp_status(eof_tuple);
418 /* last byte of the tuple specifies the tuple type
419 * set it to EOF_TUPLE
421 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
422 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
427 ret = ftap->oper->accept_tuple(temp, res);
429 // go through the list of returned tuples and finalyze them
430 list<host_tuple>::iterator iter = res.begin();
431 while (iter != res.end()) {
432 host_tuple& tup = *iter;
434 // finalize the tuple
440 // if we received temporal tuple, last tuple of the result must be temporal too
441 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
442 if (temp_tuple_received) {
444 host_tuple& temp_tup = res.back();
447 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
448 char* new_data = (char*)malloc(new_tuple_size);
449 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
450 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
451 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
452 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)ftaid, sizeof(FTAID));
454 memset((char*)&stats, 0, sizeof(fta_stat));
455 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
457 // Send a hearbeat message to clearinghouse.
458 // Disable sending heartbeats for now to avoid overloading clearinghouse
459 // fta_heartbeat(ftap->_fta.ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
461 temp_tup.free_tuple();
462 temp_tup.data = new_data;
463 temp_tup.tuple_size = new_tuple_size;
466 // append returned list to output_queue
467 ftap->output_queue.splice(ftap->output_queue.end(), res);
470 while (!ftap->output_queue.empty()) {
471 host_tuple& tup = ftap->output_queue.front();
473 fprintf(stderr, "HFTA::about to post tuple\n");
475 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
477 ftap->output_queue.pop_front();
485 gs_retval_t UNOP_HFTA_clock_fta(struct FTA *ftap) {
487 // Send a hearbeat message to clearinghouse.to indicate we are alive
488 // Disable sending heartbeats for now to avoid overloading clearinghouse
489 // fta_heartbeat(ftap->ftaid, 0, 0, 0);
498 gs_schemahandle_t schema_handle;
500 vector<operator_node*> sorted_nodes;
502 list<lfta_info*> *lfta_list;
503 /* number of eof tuples we received so far
504 * receiving eof tuples from every source fta will cause a flush
511 list<host_tuple> output_queue;
514 gs_uint32_t in_tuple_cnt;
515 gs_uint32_t out_tuple_cnt;
516 gs_uint32_t out_tuple_sz;
517 gs_uint64_t cycle_cnt;
519 gs_uint64_t trace_id;
521 // memory occupied by output queue
522 gs_uint32_t output_queue_mem;
525 // To create an hfta we will need all the parameters passed to alloc_fta by host library plus
526 // lfta_name and an instance of the base_operator. We don't need to know the schema for lfta,
527 // as the schema handle is already passed during operator creation time.
528 // We also don't need to know the output schema as this information is already embeded
529 // in create_output_tuple method of operators' functor.
533 MULTOP_HFTA(struct FTAID ftaid, gs_csp_t name, gs_int32_t command, gs_int32_t sz, void * value, gs_schemahandle_t sch_handle, operator_node* node,
534 list<lfta_info*> *lftas, bool fta_reusable, gs_uint32_t reuse_option) {
544 _fta.ftaid.streamid = (gs_p_t)this;
546 schema_handle = sch_handle;
548 output_queue_mem = 0;
550 // topologically sort the operators in the tree (or DAG)
551 // for DAG we make sure we add the node to the sorted list only once
552 operator_node* current_node;
553 map<operator_node*, int> node_map;
554 vector<operator_node*> node_list;
557 node_list.push_back(root);
562 while (i < node_list.size()) {
563 current_node = node_list[i];
564 if (current_node->left_child && node_map.find(current_node->left_child) == node_map.end()) {
565 node_map[current_node->left_child] = num_operators++;
566 node_list.push_back(current_node->left_child);
568 if (current_node->right_child && node_map.find(current_node->right_child) == node_map.end()) {
569 node_map[current_node->right_child] = num_operators++;
570 node_list.push_back(current_node->right_child);
576 // build adjacency lists for query DAG
577 list<int>* adj_lists = new list<int>[num_operators];
578 bool* leaf_flags = new bool[num_operators];
579 memset(leaf_flags, 0, num_operators * sizeof(bool));
580 for (i = 0; i < num_operators; ++i) {
581 current_node = node_list[i];
582 if (current_node->left_child) {
583 adj_lists[i].push_back(node_map[current_node->left_child]);
585 if (current_node->right_child && current_node->left_child != current_node->right_child) {
586 adj_lists[i].push_back(node_map[current_node->right_child]);
590 // run topolofical sort
591 bool leaf_found = true;
594 // add all leafs to sorted_nodes
595 for (i = 0; i < num_operators; ++i) {
596 if (!leaf_flags[i] && adj_lists[i].empty()) {
597 leaf_flags[i] = true;
598 sorted_nodes.push_back(node_list[i]);
601 // remove the node from its parents adjecency lists
602 for (int j = 0; j < num_operators; ++j) {
603 list<int>::iterator iter;
604 for (iter = adj_lists[j].begin(); iter != adj_lists[j].end(); iter++) {
606 adj_lists[j].erase(iter);
618 // set the parameter block for every operator in tree
619 for (i = 0; i < num_operators; ++i)
620 if(sorted_nodes[i]->op->set_param_block(sz, (void*)value)) failed = true;
623 fprintf(stderr,"Instantiate FTAs\n");
625 /* extract lfta param block from hfta param block */
626 // NOTE: param_list must line up with lfta_list
627 list<param_block> param_list;
628 get_lfta_params(sz, value, param_list);
629 list<param_block>::iterator iter1;
630 list<lfta_info*>::iterator iter2 = lfta_list->begin();
632 for (iter1 = param_list.begin(), i = 0; iter1 != param_list.end(); iter1++, iter2++, i++) {
633 lfta_info* inf = *iter2;
636 fprintf(stderr,"Instantiate a FTA\n");
639 gs_uint32_t reuse_flag = 2;
641 // we will try to create a new instance of child FTA only if it is parameterized
642 if ((*iter1).block_length > 0 && (reuse_option == 0 || (!fta_reusable && reuse_option == 2))) {
643 (*iter2)->f.streamid = 0; // not interested in existing instances
646 if (fta_alloc_instance(_fta.ftaid, &(*iter2)->f,(*iter2)->fta_name, (*iter2)->schema, reuse_flag, FTA_COMMAND_LOAD_PARAMS,(*iter1).block_length,(*iter1).data)!=0) {
647 fprintf(stderr,"HFTA::error:could instantiate a FTA");
654 //fprintf(stderr,"HFTA::Low level FTA instanciation done\n");
656 _fta.stream_subscribed[i]=(*iter2)->f;
658 _fta.stream_subscribed_cnt = i;
662 _fta.alloc_fta = NULL; // why should this be a part of the FTA (it is a factory function)
663 _fta.free_fta = MULTOP_HFTA_free_fta;
664 _fta.control_fta = MULTOP_HFTA_control_fta;
665 _fta.accept_packet = MULTOP_HFTA_accept_packet;
666 _fta.clock_fta = MULTOP_HFTA_clock_fta;
668 // init runtime stats
678 list<lfta_info*>::iterator iter;
681 for (iter = lfta_list->begin(); i < _fta.stream_subscribed_cnt; iter++, i++) {
685 delete root; // free operators memory
693 list<host_tuple> res;
695 // go through the list of operators in topological order
697 list<host_tuple>::iterator iter;
698 list<host_tuple> temp_output_queue;
700 for (int i = 0; i < num_operators; ++i) {
701 operator_node* node = sorted_nodes[i];
704 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? temp_output_queue : res;
706 // for trees we can put output tuples directly into parent's input buffer
707 list<host_tuple>& current_output_queue = (i < (num_operators - 1)) ? node->parent->input_queue : res;
709 // consume tuples waiting in your queue
710 for (iter = node->input_queue.begin(); iter != node->input_queue.end(); iter++) {
711 node->op->accept_tuple(*(iter), current_output_queue);
713 node->op->flush(current_output_queue);
714 node->input_queue.clear();
717 // copy the tuples from output queue into input queues of all parents
718 list<operator_node*>::iterator node_iter;
720 if (!node->parent_list.empty()) {
721 // append the content of the output queue to parent input queue
723 for (iter = temp_output_queue.begin(); iter != temp_output_queue.end(); iter++) {
725 if (node->parent_list.size() > 1) {
726 ref_cnt = (int*)malloc(sizeof(int));
727 *ref_cnt = node->parent_list.size() - 1;
730 for (node_iter = node->parent_list.begin(); node_iter != node->parent_list.end(); node_iter++) {
731 (*iter).ref_cnt = ref_cnt;
732 (*node_iter)->input_queue.push_back(*iter);
740 // go through the list of returned tuples and finalyze them
741 list<host_tuple>::iterator iter = res.begin();
742 while (iter != res.end()) {
743 host_tuple& tup = *iter;
745 // finalize the tuple
749 output_queue_mem += tup.tuple_size;
753 // append returned list to output_queue
754 output_queue.splice(output_queue.end(), res);
758 while (!output_queue.empty()) {
759 host_tuple& tup = output_queue.front();
761 fprintf(stderr, "HFTA::about to post tuple\n");
763 if (hfta_post_tuple(&_fta,tup.tuple_size,tup.data) == 0) {
764 output_queue_mem -= tup.tuple_size;
766 output_queue.pop_front();
775 bool init_failed(){return failed;}
779 gs_retval_t MULTOP_HFTA_free_fta (struct FTA *fta, gs_uint32_t recursive) {
780 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta; // deallocate the fta and call the destructor
781 // will be called on program exit
784 // free instance we are subscribed to
785 list<lfta_info*>::iterator iter;
788 for (iter = ftap->lfta_list->begin(); i < fta->stream_subscribed_cnt; iter++, i++) {
789 fta_free_instance(gscpipc_getftaid(), (*iter)->f, recursive);
797 gs_retval_t MULTOP_HFTA_control_fta (struct FTA *fta, gs_int32_t command, gs_int32_t sz, void * value) {
798 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
800 if (command == FTA_COMMAND_FLUSH) {
802 // ask lftas to do the flush
803 list<lfta_info*>::iterator iter;
804 for (iter = ftap->lfta_list->begin(); iter != ftap->lfta_list->end(); iter++) {
805 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, sz, value);
807 // flush hfta operators
810 } else if (command == FTA_COMMAND_LOAD_PARAMS) {
812 list<param_block> param_list;
813 get_lfta_params(sz, value, param_list);
815 // ask lftas to do the flush and set new parameters
816 list<lfta_info*>::iterator iter;
817 list<param_block>::iterator iter2;
818 for (iter = ftap->lfta_list->begin(), iter2 = param_list.begin(); iter != ftap->lfta_list->end(); iter++, iter2++) {
819 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_FLUSH, 0, NULL);
820 fta_control(gscpipc_getftaid(), (*iter)->f, FTA_COMMAND_LOAD_PARAMS, (*iter2).block_length,(*iter2).data);
823 // flush hfta operators
826 // set the new parameter block for every operator in tree
827 for (int i = 0; i < ftap->num_operators; ++i)
828 ftap->sorted_nodes[i]->op->set_param_block(sz, (void*)value);
830 } else if (command == FTA_COMMAND_GET_TEMP_STATUS) {
831 // we no longer use temp_status commands
832 // hearbeat mechanism is used instead
837 gs_retval_t MULTOP_HFTA_accept_packet (struct FTA *fta, FTAID *ftaid, void * packet, gs_int32_t length) {
838 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
840 gs_uint64_t start_cycle = rdtsc();
842 fprintf(stderr, "HFTA::accepted packet\n");
844 if (!length) /* ignore null tuples */
847 ftap->in_tuple_cnt++;
850 temp.tuple_size = length;
853 temp.heap_resident = false;
855 // fprintf(stderr,"created temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
857 // find from which lfta the tuple came
858 list<lfta_info*>::iterator iter;
859 lfta_info* inf = NULL;
862 for (i = 0, iter = ftap->lfta_list->begin(); i < ftap->_fta.stream_subscribed_cnt; iter++, i++) {
863 if (ftap->_fta.stream_subscribed[i].ip == ftaid->ip &&
864 ftap->_fta.stream_subscribed[i].port == ftaid->port &&
865 ftap->_fta.stream_subscribed[i].index == ftaid->index &&
866 ftap->_fta.stream_subscribed[i].streamid == ftaid->streamid) {
873 fprintf(stderr,"HFTA::error:received tuple from unknown stream\n");
877 // route tuple through operator chain
878 list<host_tuple> result;
882 temp.channel = inf->output_channel;
884 operator_node* current_node = NULL, *child = NULL;
885 list<host_tuple> temp_output_queue;
888 fta_stat* tup_trace = NULL;
889 gs_uint32_t tup_trace_sz = 0;
890 gs_uint64_t trace_id = 0;
891 bool temp_tuple_received = false;
893 // if the tuple is temporal we need to extract the heartbeat payload
894 if (ftaschema_is_temporal_tuple(inf->schema_handle, packet)) {
895 temp_tuple_received = true;
896 if (ftaschema_get_trace(inf->schema_handle, packet, length, &trace_id, &tup_trace_sz, &tup_trace))
897 fprintf(stderr, "HFTA error: received temporal status tuple with no trace\n");
900 if (ftaschema_is_eof_tuple(inf->schema_handle, packet)) {
902 if (++ftap->num_eof_tuples < ftap->lfta_list->size())
905 ftap->num_eof_tuples = 0;
907 /* perform a flush */
910 /* post eof_tuple to a parent */
911 host_tuple eof_tuple;
912 ftap->sorted_nodes[ftap->num_operators - 1]->op->get_temp_status(eof_tuple);
914 /* last byte of the tuple specify the tuple type
915 * set it to EOF_TUPLE
917 *((char*)eof_tuple.data + (eof_tuple.tuple_size - sizeof(char))) = EOF_TUPLE;
918 hfta_post_tuple(fta,eof_tuple.tuple_size,eof_tuple.data);
919 ftap->out_tuple_cnt++;
920 ftap->out_tuple_sz+=eof_tuple.tuple_size;
925 list<host_tuple>::iterator iter2;
929 // push tuple to all parent operators of the lfta
930 list<operator_node*>::iterator node_iter;
931 list<unsigned>::iterator chan_iter;
932 for (node_iter = inf->parent_list.begin(), chan_iter = inf->out_channel_list.begin(); node_iter != inf->parent_list.end(); node_iter++, chan_iter++) {
933 temp.channel = *chan_iter;
934 (*node_iter)->input_queue.push_back(temp);
937 for (i = 0; i < ftap->num_operators; ++i) {
939 operator_node* node = ftap->sorted_nodes[i];
940 list<host_tuple>& current_output_queue = (i < (ftap->num_operators - 1)) ? temp_output_queue : result;
942 // consume tuples waiting in your queue
943 for (iter2 = node->input_queue.begin(); iter2 != node->input_queue.end(); iter2++) {
944 node->op->accept_tuple(*(iter2), current_output_queue);
946 node->input_queue.clear();
948 // copy the tuples from output queue into input queues of all parents
950 if (!node->parent_list.empty()) {
952 // append the content of the output queue to parent input queue
953 for (iter2 = temp_output_queue.begin(); iter2 != temp_output_queue.end(); iter2++) {
956 if (node->parent_list.size() > 1) {
957 ref_cnt = (int*)malloc(sizeof(int));
958 *ref_cnt = node->parent_list.size() - 1;
961 for (node_iter = node->parent_list.begin(), chan_iter = node->out_channel_list.begin(); node_iter != node->parent_list.end(); node_iter++, chan_iter++) {
962 (*iter2).ref_cnt = ref_cnt;
963 (*iter2).channel = *chan_iter;
964 (*node_iter)->input_queue.push_back(*iter2);
968 temp_output_queue.clear();
971 current_node = inf->parent;
973 // fprintf(stderr,"Pushing temp, channel=%d, resident=%d\n",temp.channel, (int)temp.heap_resident);
974 current_node->input_queue.push_back(temp);
977 //fprintf(stderr,"Routing tuple, current node is %d, parent is %d\n",current_node,current_node->parent);
978 list<host_tuple>& current_output_queue = (current_node->parent) ? current_node->parent->input_queue : result;
980 // consume tuples waiting in your queue
981 for (iter2 = current_node->input_queue.begin(); iter2 != current_node->input_queue.end(); iter2++) {
982 current_node->op->accept_tuple((*iter2),current_output_queue);
984 // All consumed, delete them
985 current_node->input_queue.clear();
986 current_node = current_node->parent;
988 } while (current_node);
994 bool no_temp_tuple = false;
996 // if we received temporal tuple, last tuple of the result must be temporal too
997 // we need to extend the trace by adding ftaid and send a hearbeat to clearinghouse
998 if (temp_tuple_received) {
1000 if (result.empty()) {
1001 no_temp_tuple = true;
1005 temp_tup = result.back();
1006 finalize_tuple(temp_tup);
1008 int new_tuple_size = temp_tup.tuple_size + sizeof(gs_uint64_t) + (tup_trace_sz + 1)* sizeof(fta_stat);
1009 char* new_data = (char*)malloc(new_tuple_size);
1010 memcpy(new_data, temp_tup.data, temp_tup.tuple_size);
1011 memcpy(new_data + temp_tup.tuple_size, &trace_id, sizeof(gs_uint64_t));
1012 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t), (char*)tup_trace, tup_trace_sz * sizeof(fta_stat));
1015 memset((char*)&stats, 0, sizeof(fta_stat));
1016 stats.ftaid = fta->ftaid;
1017 stats.in_tuple_cnt = ftap->in_tuple_cnt;
1018 stats.out_tuple_cnt = ftap->out_tuple_cnt;
1019 stats.out_tuple_sz = ftap->out_tuple_sz;
1020 stats.cycle_cnt = ftap->cycle_cnt;
1021 memcpy(new_data + temp_tup.tuple_size + sizeof(gs_uint64_t) + tup_trace_sz * sizeof(fta_stat), (char*)&stats, sizeof(fta_stat));
1023 // Send a hearbeat message to clearinghouse.
1024 // Disable sending heartbeats for now to avoid overloading clearinghouse
1025 // fta_heartbeat(fta->ftaid, trace_id, tup_trace_sz + 1, (fta_stat *)((char*)new_data + temp_tup.tuple_size + sizeof(gs_uint64_t)));
1028 ftap->in_tuple_cnt = 0;
1029 ftap->out_tuple_cnt = 0;
1030 ftap->out_tuple_sz = 0;
1031 ftap->cycle_cnt = 0;
1033 free(temp_tup.data);
1034 temp_tup.data = new_data;
1035 temp_tup.tuple_size = new_tuple_size;
1040 // go through the list of returned tuples and finalyze them
1041 // since we can produce multiple temporal tuples in DAG plans
1042 // we can drop all of them except the last one
1043 iter2 = result.begin();
1044 while(iter2 != result.end()) {
1045 host_tuple tup = *iter2;
1047 // finalize the tuple
1048 if (tup.tuple_size) {
1049 finalize_tuple(tup);
1052 if (ftaschema_is_temporal_tuple(ftap->schema_handle, tup.data))
1057 ftap->output_queue.push_back(tup);
1058 ftap->output_queue_mem += tup.tuple_size;
1065 // append returned list to output_queue
1066 // ftap->output_queue.splice(ftap->output_queue.end(), result);
1068 if (temp_tuple_received && !no_temp_tuple) {
1069 ftap->output_queue.push_back(temp_tup);
1070 ftap->output_queue_mem += temp_tup.tuple_size;
1074 while (!ftap->output_queue.empty()) {
1075 host_tuple& tup = ftap->output_queue.front();
1077 fprintf(stderr, "HFTA::about to post tuple\n");
1079 if (hfta_post_tuple(fta,tup.tuple_size,tup.data) == 0) {
1080 ftap->out_tuple_cnt++;
1081 ftap->out_tuple_sz+=tup.tuple_size;
1082 ftap->output_queue_mem -= tup.tuple_size;
1084 ftap->output_queue.pop_front();
1089 ftap->cycle_cnt += rdtsc() - start_cycle;
1094 gs_retval_t MULTOP_HFTA_clock_fta(struct FTA *fta) {
1095 MULTOP_HFTA* ftap = (MULTOP_HFTA*)fta;
1099 fprintf(stderr, "FTA = %s|", ftap->fta_name);
1100 fprintf(stderr, "in_tuple_cnt = %u|", ftap->in_tuple_cnt);
1101 fprintf(stderr, "out_tuple_cnt = %u|", ftap->out_tuple_cnt);
1102 fprintf(stderr, "out_tuple_sz = %u|", ftap->out_tuple_sz);
1103 fprintf(stderr, "cycle_cnt = %llu|", ftap->cycle_cnt);
1106 fprintf(stderr, "mem_footprint %s = %d", ftap->sorted_nodes[0]->op->get_name(), ftap->sorted_nodes[0]->op->get_mem_footprint());
1107 unsigned int total_mem = ftap->sorted_nodes[0]->op->get_mem_footprint();
1109 for (int i = 1; i < ftap->num_operators; ++i) {
1110 operator_node* node = ftap->sorted_nodes[i];
1111 fprintf(stderr, ",%s = %d", node->op->get_name(), node->op->get_mem_footprint());
1112 total_mem += node->op->get_mem_footprint();
1114 fprintf(stderr, ", total = %d|", total_mem );
1115 fprintf(stderr, "output_buffer_size = %d\n", ftap->output_queue_mem );
1119 memset((char*)&stats, 0, sizeof(fta_stat));
1120 stats.ftaid = fta->ftaid;
1121 stats.in_tuple_cnt = ftap->in_tuple_cnt;
1122 stats.out_tuple_cnt = ftap->out_tuple_cnt;
1123 stats.out_tuple_sz = ftap->out_tuple_sz;
1124 stats.cycle_cnt = ftap->cycle_cnt;
1126 // Send a hearbeat message to clearinghouse.
1127 // Disable sending heartbeats for now to avoid overloading clearinghouse
1128 // fta_heartbeat(fta->ftaid, ftap->trace_id++, 1, &stats);
1130 // resets runtime stats
1131 ftap->in_tuple_cnt = 0;
1132 ftap->out_tuple_cnt = 0;
1133 ftap->out_tuple_sz = 0;
1134 ftap->cycle_cnt = 0;