-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#ifndef MERGE_OPERATOR_H
-#define MERGE_OPERATOR_H
-
-#include "host_tuple.h"
-#include "base_operator.h"
-#include <list>
-using namespace std;
-
-#include <stdio.h>
-
-
-//int last_tb = 0;
-
-template <class merge_functor>
-class merge_operator : public base_operator {
-private :
- // input channel on which we want to receive the next tuple
- // value -1 indicates that we never received a single tuple
- // and we are not concerned on which channel tuple will arrive
- int wait_channel;
-
- // list of tuples from one of the channel waiting to be compared
- // against tuple from the other channel
- list<host_tuple> merge_queue;
- int merge_qsize;
-
- // comparator object used to compare the tuples
- merge_functor func;
-
- // soft limit on queue size - we consider operator blocked on its input
- // whenever we reach this soft limit (not used anymore)
- size_t soft_queue_size_limit;
-
- int dropped_cnt;
-
- // memory footprint for the merge queue in bytes
- unsigned int queue_mem;
-
- // appends tuple to the end of the merge_queue
- // if tuple is stack resident, makes it heap resident
- void append_tuple(host_tuple& tup) {
- if (!tup.heap_resident) {
- char* data = (char*)malloc(tup.tuple_size);
- memcpy(data, tup.data, tup.tuple_size);
- tup.data = data;
- tup.heap_resident = true;
- }
- merge_queue.push_back(tup);
- merge_qsize++;
- queue_mem += tup.tuple_size;
- }
-
- void purge_queue(int channel, list<host_tuple>& result){
- if (merge_queue.empty())
- return;
- host_tuple top_tuple = merge_queue.front();
- // remove all the tuple smaller than arrived tuple
- while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {
- merge_queue.pop_front();
- merge_qsize--;
- queue_mem -= top_tuple.tuple_size;
- if(merge_qsize<0) abort();
- func.update_stored_temp_status(top_tuple,channel);
- func.xform_tuple(top_tuple);
- top_tuple.channel = output_channel;
- result.push_back(top_tuple);
-
- if (merge_queue.empty())
- break;
-
- top_tuple = merge_queue.front();
- func.update_stored_temp_status(top_tuple,channel);
- }
- }
-
-
-
-public:
-
- merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
- wait_channel = -1;
- soft_queue_size_limit = size_limit;
- merge_qsize = 0;
- dropped_cnt = 0;
- queue_mem = 0;
- }
-
-
- int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
- int last_channel;
-
- if (tup.channel > 1) {
- fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
- return 0;
- }
-
- /* reject "out of order" tuple - we can receive those after forced flush */
- func.get_timestamp(tup);
- int res = func.compare_with_temp_status(tup.channel);
- bool is_temp_tuple = func.temp_status_received(tup);
-
-/*
-// Ignore temp tuples until we can fix their timestamps.
-if(is_temp_tuple){
- tup.free_tuple();
- return 0;
-}*/
- if (res < 0){ // out of order tuple
-
- if (++dropped_cnt % 100000 == 0) {
- gslog(LOG_ALERT, "%d tuples dropped by %s merge\n", dropped_cnt,get_name());
- }
- //if(func.print_warnings())
- // fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);
- // free tuple memory
- tup.free_tuple();
- return 0;
- } else {
-
- if (wait_channel < 0 || wait_channel != tup.channel) {
- if (wait_channel < 0)
- func.update_temp_status(tup);
-
- if (!is_temp_tuple){
- if(func.compare_with_temp_status(1-tup.channel) <= 0){
- if (!tup.heap_resident) {
- char* data = (char*)malloc(tup.tuple_size);
- memcpy(data, tup.data, tup.tuple_size);
- tup.data = data;
- tup.heap_resident = true;
- }
- func.xform_tuple(tup);
- tup.channel = output_channel;
- result.push_back(tup);
- }else{
- append_tuple(tup); // put arrived tuple in the queue
- }
- }
-// func.update_temp_status_by_slack(tup, 1-tup.channel);
-// purge_queue(tup.channel, result);
-
- wait_channel = 1-tup.channel;
- }else{
- // If possible, clear tuples from the other queue.
- func.update_temp_status(tup);
- purge_queue(1-tup.channel, result);
-
- if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple
- if (!is_temp_tuple) {
- if (!tup.heap_resident) {
- char* data = (char*)malloc(tup.tuple_size);
- memcpy(data, tup.data, tup.tuple_size);
- tup.data = data;
- tup.heap_resident = true;
- }
- func.xform_tuple(tup);
- tup.channel = output_channel;
- result.push_back(tup);
- }
- }
- else {
- if (!is_temp_tuple)
- append_tuple(tup); // put arrived tuple in the queue
- wait_channel = 1 - tup.channel; // now we want the tuple from other channel
- }
- }
-
- }
-
- // temp status tuples emited by merge don't serve any other purpose
- // other than tracing the flow of tuples
- if (is_temp_tuple) {
- host_tuple temp_tup;
- if (!func.create_temp_status_tuple(temp_tup)) {
- temp_tup.channel = output_channel;
- result.push_back(temp_tup);
- }
- // clear memory of heap-resident temporal tuples
- tup.free_tuple();
- }
-
- if (!merge_qsize)
- wait_channel = -1;
-
- return 0;
- }
-
- int flush(list<host_tuple>& result) {
-
- if (merge_queue.empty())
- return 0;
-
- host_tuple top_tuple;
- list<host_tuple>::iterator iter;
- for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
- top_tuple = *iter;
- func.update_stored_temp_status(top_tuple,top_tuple.channel);
- func.xform_tuple(top_tuple);
- top_tuple.channel = output_channel;
- result.push_back(top_tuple);
- }
-
- queue_mem = 0;
-
- return 0;
- }
-
- int set_param_block(int sz, void * value) {
- return 0;
- }
-
-
- int get_temp_status(host_tuple& result) {
- result.channel = output_channel;
- return func.create_temp_status_tuple(result);
- }
-
-
- int get_blocked_status () {
- if (merge_qsize> soft_queue_size_limit)
- return wait_channel;
- else
- return -1;
- }
-
- unsigned int get_mem_footprint() {
- return queue_mem;
- }
-
-};
-
-#endif // MERGE_OPERATOR_H
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#ifndef MERGE_OPERATOR_H\r
+#define MERGE_OPERATOR_H\r
+\r
+#include "host_tuple.h"\r
+#include "base_operator.h"\r
+#include <list>\r
+using namespace std;\r
+\r
+#include <stdio.h>\r
+\r
+\r
+//int last_tb = 0;\r
+\r
+template <class merge_functor>\r
+class merge_operator : public base_operator {\r
+private :\r
+ // input channel on which we want to receive the next tuple\r
+ // value -1 indicates that we never received a single tuple\r
+ // and we are not concerned on which channel tuple will arrive\r
+ int wait_channel;\r
+\r
+ // list of tuples from one of the channel waiting to be compared\r
+ // against tuple from the other channel\r
+ list<host_tuple> merge_queue;\r
+ int merge_qsize;\r
+\r
+ // comparator object used to compare the tuples\r
+ merge_functor func;\r
+\r
+ // soft limit on queue size - we consider operator blocked on its input\r
+ // whenever we reach this soft limit (not used anymore)\r
+ size_t soft_queue_size_limit;\r
+\r
+ int dropped_cnt;\r
+\r
+ // memory footprint for the merge queue in bytes\r
+ unsigned int queue_mem;\r
+\r
+ // appends tuple to the end of the merge_queue\r
+ // if tuple is stack resident, makes it heap resident\r
+ void append_tuple(host_tuple& tup) {\r
+ if (!tup.heap_resident) {\r
+ char* data = (char*)malloc(tup.tuple_size);\r
+ memcpy(data, tup.data, tup.tuple_size);\r
+ tup.data = data;\r
+ tup.heap_resident = true;\r
+ }\r
+ merge_queue.push_back(tup);\r
+ merge_qsize++;\r
+ queue_mem += tup.tuple_size;\r
+ }\r
+\r
+ void purge_queue(int channel, list<host_tuple>& result){\r
+ if (merge_queue.empty())\r
+ return;\r
+ host_tuple top_tuple = merge_queue.front();\r
+ // remove all the tuple smaller than arrived tuple\r
+ while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {\r
+ merge_queue.pop_front();\r
+ merge_qsize--;\r
+ queue_mem -= top_tuple.tuple_size;\r
+ if(merge_qsize<0) abort();\r
+ func.update_stored_temp_status(top_tuple,channel);\r
+ func.xform_tuple(top_tuple);\r
+ top_tuple.channel = output_channel;\r
+ result.push_back(top_tuple);\r
+\r
+ if (merge_queue.empty())\r
+ break;\r
+\r
+ top_tuple = merge_queue.front();\r
+ func.update_stored_temp_status(top_tuple,channel);\r
+ }\r
+ }\r
+\r
+\r
+\r
+public:\r
+\r
+ merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {\r
+ wait_channel = -1;\r
+ soft_queue_size_limit = size_limit;\r
+ merge_qsize = 0;\r
+ dropped_cnt = 0;\r
+ queue_mem = 0;\r
+ }\r
+\r
+\r
+ int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
+ int last_channel;\r
+\r
+ if (tup.channel > 1) {\r
+ fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);\r
+ return 0;\r
+ }\r
+\r
+ /* reject "out of order" tuple - we can receive those after forced flush */\r
+ func.get_timestamp(tup);\r
+ int res = func.compare_with_temp_status(tup.channel);\r
+ bool is_temp_tuple = func.temp_status_received(tup);\r
+\r
+/*\r
+// Ignore temp tuples until we can fix their timestamps.\r
+if(is_temp_tuple){\r
+ tup.free_tuple();\r
+ return 0;\r
+}*/\r
+ if (res < 0){ // out of order tuple\r
+\r
+ if (++dropped_cnt % 100000 == 0) {\r
+ gslog(LOG_ALERT, "%d tuples dropped by %s merge\n", dropped_cnt,get_name());\r
+ }\r
+ //if(func.print_warnings())\r
+ // fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);\r
+ // free tuple memory\r
+ tup.free_tuple();\r
+ return 0;\r
+ } else {\r
+\r
+ if (wait_channel < 0 || wait_channel != tup.channel) {\r
+ if (wait_channel < 0)\r
+ func.update_temp_status(tup);\r
+\r
+ if (!is_temp_tuple){\r
+ if(func.compare_with_temp_status(1-tup.channel) <= 0){\r
+ if (!tup.heap_resident) {\r
+ char* data = (char*)malloc(tup.tuple_size);\r
+ memcpy(data, tup.data, tup.tuple_size);\r
+ tup.data = data;\r
+ tup.heap_resident = true;\r
+ }\r
+ func.xform_tuple(tup);\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }else{\r
+ append_tuple(tup); // put arrived tuple in the queue\r
+ }\r
+ }\r
+// func.update_temp_status_by_slack(tup, 1-tup.channel);\r
+// purge_queue(tup.channel, result);\r
+\r
+ wait_channel = 1-tup.channel;\r
+ }else{\r
+ // If possible, clear tuples from the other queue.\r
+ func.update_temp_status(tup);\r
+ purge_queue(1-tup.channel, result);\r
+\r
+ if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple\r
+ if (!is_temp_tuple) {\r
+ if (!tup.heap_resident) {\r
+ char* data = (char*)malloc(tup.tuple_size);\r
+ memcpy(data, tup.data, tup.tuple_size);\r
+ tup.data = data;\r
+ tup.heap_resident = true;\r
+ }\r
+ func.xform_tuple(tup);\r
+ tup.channel = output_channel;\r
+ result.push_back(tup);\r
+ }\r
+ }\r
+ else {\r
+ if (!is_temp_tuple)\r
+ append_tuple(tup); // put arrived tuple in the queue\r
+ wait_channel = 1 - tup.channel; // now we want the tuple from other channel\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ // temp status tuples emited by merge don't serve any other purpose\r
+ // other than tracing the flow of tuples\r
+ if (is_temp_tuple) {\r
+ host_tuple temp_tup;\r
+ if (!func.create_temp_status_tuple(temp_tup)) {\r
+ temp_tup.channel = output_channel;\r
+ result.push_back(temp_tup);\r
+ }\r
+ // clear memory of heap-resident temporal tuples\r
+ tup.free_tuple();\r
+ }\r
+\r
+ if (!merge_qsize)\r
+ wait_channel = -1;\r
+\r
+ return 0;\r
+ }\r
+\r
+ int flush(list<host_tuple>& result) {\r
+\r
+ if (merge_queue.empty())\r
+ return 0;\r
+\r
+ host_tuple top_tuple;\r
+ list<host_tuple>::iterator iter;\r
+ for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {\r
+ top_tuple = *iter;\r
+ func.update_stored_temp_status(top_tuple,top_tuple.channel);\r
+ func.xform_tuple(top_tuple);\r
+ top_tuple.channel = output_channel;\r
+ result.push_back(top_tuple);\r
+ }\r
+\r
+ queue_mem = 0;\r
+\r
+ return 0;\r
+ }\r
+\r
+ int set_param_block(int sz, void * value) {\r
+ return 0;\r
+ }\r
+\r
+\r
+ int get_temp_status(host_tuple& result) {\r
+ result.channel = output_channel;\r
+ return func.create_temp_status_tuple(result);\r
+ }\r
+\r
+\r
+ int get_blocked_status () {\r
+ if (merge_qsize> soft_queue_size_limit)\r
+ return wait_channel;\r
+ else\r
+ return -1;\r
+ }\r
+\r
+ unsigned int get_mem_footprint() {\r
+ return queue_mem;\r
+ }\r
+\r
+};\r
+\r
+#endif // MERGE_OPERATOR_H\r