Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / merge_operator.h
index 68d2f5f..662fd32 100644 (file)
-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
-   Licensed under the Apache License, Version 2.0 (the "License");\r
-   you may not use this file except in compliance with the License.\r
-   You may obtain a copy of the License at\r
-\r
-     http://www.apache.org/licenses/LICENSE-2.0\r
-\r
-   Unless required by applicable law or agreed to in writing, software\r
-   distributed under the License is distributed on an "AS IS" BASIS,\r
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-   See the License for the specific language governing permissions and\r
-   limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#ifndef MERGE_OPERATOR_H\r
-#define MERGE_OPERATOR_H\r
-\r
-#include "host_tuple.h"\r
-#include "base_operator.h"\r
-#include <list>\r
-using namespace std;\r
-\r
-#include <stdio.h>\r
-\r
-\r
-//int last_tb = 0;\r
-\r
-template <class merge_functor>\r
-class merge_operator : public base_operator {\r
-private :\r
-       // input channel on which we want to receive the next tuple\r
-       // value -1 indicates that we never received a single tuple\r
-       // and we are not concerned on which channel tuple will arrive\r
-       int wait_channel;\r
-\r
-       // list of tuples from one of the channel waiting to be compared\r
-       // against tuple from the other channel\r
-       list<host_tuple> merge_queue;\r
-       int merge_qsize;\r
-\r
-       // comparator object used to compare the tuples\r
-       merge_functor func;\r
-\r
-       // soft limit on queue size - we consider operator blocked on its input\r
-       // whenever we reach this soft limit (not used anymore)\r
-       size_t soft_queue_size_limit;\r
-\r
-       int dropped_cnt;\r
-\r
-       // memory footprint for the merge queue in bytes\r
-       unsigned int queue_mem;\r
-\r
-       // appends tuple to the end of the merge_queue\r
-       // if tuple is stack resident, makes it heap resident\r
-       void append_tuple(host_tuple& tup) {\r
-               if (!tup.heap_resident) {\r
-                       char* data = (char*)malloc(tup.tuple_size);\r
-                       memcpy(data, tup.data, tup.tuple_size);\r
-                       tup.data = data;\r
-                       tup.heap_resident = true;\r
-               }\r
-               merge_queue.push_back(tup);\r
-               merge_qsize++;\r
-               queue_mem += tup.tuple_size;\r
-       }\r
-\r
-       void purge_queue(int channel, list<host_tuple>& result){\r
-               if (merge_queue.empty())\r
-                       return;\r
-               host_tuple top_tuple = merge_queue.front();\r
-               // remove all the tuple smaller than arrived tuple\r
-               while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {\r
-                       merge_queue.pop_front();\r
-                       merge_qsize--;\r
-                       queue_mem -= top_tuple.tuple_size;\r
-                       if(merge_qsize<0) abort();\r
-                       func.update_stored_temp_status(top_tuple,channel);\r
-                       func.xform_tuple(top_tuple);\r
-                       top_tuple.channel = output_channel;\r
-                       result.push_back(top_tuple);\r
-\r
-                       if (merge_queue.empty())\r
-                               break;\r
-\r
-                       top_tuple = merge_queue.front();\r
-                       func.update_stored_temp_status(top_tuple,channel);\r
-               }\r
-       }\r
-\r
-\r
-\r
-public:\r
-\r
-       merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {\r
-               wait_channel = -1;\r
-               soft_queue_size_limit = size_limit;\r
-               merge_qsize = 0;\r
-               dropped_cnt = 0;\r
-               queue_mem = 0;\r
-       }\r
-\r
-\r
-       int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
-               int last_channel;\r
-\r
-               if (tup.channel > 1) {\r
-                       fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);\r
-                       return 0;\r
-               }\r
-\r
-               /* reject "out of order" tuple - we can receive those after forced flush */\r
-               func.get_timestamp(tup);\r
-               int res = func.compare_with_temp_status(tup.channel);\r
-               bool is_temp_tuple = func.temp_status_received(tup);\r
-\r
-/*\r
-//                     Ignore temp tuples until we can fix their timestamps.\r
-if(is_temp_tuple){\r
- tup.free_tuple();\r
- return 0;\r
-}*/\r
-               if (res < 0){   // out of order tuple\r
-\r
-                       if (++dropped_cnt % 100000 == 0) {\r
-                               gslog(LOG_ALERT, "%d tuples dropped by %s  merge\n", dropped_cnt,get_name());\r
-                       }\r
-                       //if(func.print_warnings())\r
-                       //      fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);\r
-                       // free tuple memory\r
-                       tup.free_tuple();\r
-                       return 0;\r
-               } else {\r
-\r
-                       if (wait_channel < 0 || wait_channel != tup.channel) {\r
-                               if (wait_channel < 0)\r
-                                       func.update_temp_status(tup);\r
-\r
-                               if (!is_temp_tuple){\r
-                                       if(func.compare_with_temp_status(1-tup.channel) <= 0){\r
-                                               if (!tup.heap_resident) {\r
-                                                       char* data = (char*)malloc(tup.tuple_size);\r
-                                                       memcpy(data, tup.data, tup.tuple_size);\r
-                                                       tup.data = data;\r
-                                                       tup.heap_resident = true;\r
-                                               }\r
-                                               func.xform_tuple(tup);\r
-                                               tup.channel = output_channel;\r
-                                               result.push_back(tup);\r
-                                       }else{\r
-                                               append_tuple(tup);              // put arrived tuple in the queue\r
-                                       }\r
-                               }\r
-//                             func.update_temp_status_by_slack(tup, 1-tup.channel);\r
-//                             purge_queue(tup.channel, result);\r
-\r
-                               wait_channel = 1-tup.channel;\r
-                       }else{\r
-       //                              If possible, clear tuples from the other queue.\r
-                               func.update_temp_status(tup);\r
-                               purge_queue(1-tup.channel, result);\r
-\r
-                               if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple\r
-                                       if (!is_temp_tuple) {\r
-                                               if (!tup.heap_resident) {\r
-                                                        char* data = (char*)malloc(tup.tuple_size);\r
-                                                        memcpy(data, tup.data, tup.tuple_size);\r
-                                                        tup.data = data;\r
-                                                        tup.heap_resident = true;\r
-                                                }\r
-                                               func.xform_tuple(tup);\r
-                                               tup.channel = output_channel;\r
-                                               result.push_back(tup);\r
-                                       }\r
-                               }\r
-                               else {\r
-                                       if (!is_temp_tuple)\r
-                                               append_tuple(tup);              // put arrived tuple in the queue\r
-                                       wait_channel = 1 - tup.channel; // now we want the tuple from other channel\r
-                               }\r
-                       }\r
-\r
-               }\r
-\r
-               // temp status tuples emited by merge don't serve any other purpose\r
-               // other than tracing the flow of tuples\r
-               if (is_temp_tuple) {\r
-                       host_tuple temp_tup;\r
-                       if (!func.create_temp_status_tuple(temp_tup)) {\r
-                               temp_tup.channel = output_channel;\r
-                               result.push_back(temp_tup);\r
-                       }\r
-                       // clear memory of heap-resident temporal tuples\r
-                       tup.free_tuple();\r
-               }\r
-\r
-               if (!merge_qsize)\r
-                       wait_channel = -1;\r
-\r
-               return 0;\r
-       }\r
-\r
-       int flush(list<host_tuple>& result) {\r
-\r
-               if (merge_queue.empty())\r
-                       return 0;\r
-\r
-               host_tuple top_tuple;\r
-               list<host_tuple>::iterator iter;\r
-               for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {\r
-                       top_tuple = *iter;\r
-                       func.update_stored_temp_status(top_tuple,top_tuple.channel);\r
-                       func.xform_tuple(top_tuple);\r
-                       top_tuple.channel = output_channel;\r
-                       result.push_back(top_tuple);\r
-               }\r
-\r
-               queue_mem = 0;\r
-\r
-               return 0;\r
-       }\r
-\r
-       int set_param_block(int sz, void * value) {\r
-                       return 0;\r
-       }\r
-\r
-\r
-       int get_temp_status(host_tuple& result) {\r
-               result.channel = output_channel;\r
-               return func.create_temp_status_tuple(result);\r
-       }\r
-\r
-\r
-       int get_blocked_status () {\r
-               if (merge_qsize> soft_queue_size_limit)\r
-                       return wait_channel;\r
-               else\r
-                       return -1;\r
-       }\r
-\r
-       unsigned int get_mem_footprint() {\r
-               return queue_mem;\r
-       }\r
-\r
-};\r
-\r
-#endif // MERGE_OPERATOR_H\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ ------------------------------------------- */
+
+#ifndef MERGE_OPERATOR_H
+#define MERGE_OPERATOR_H
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+using namespace std;
+
+#include <stdio.h>
+
+
+//int last_tb = 0;
+
+template <class merge_functor>
+class merge_operator : public base_operator {
+private :
+       // input channel on which we want to receive the next tuple
+       // value -1 indicates that we never received a single tuple
+       // and we are not concerned on which channel tuple will arrive
+       int wait_channel;
+
+       // list of tuples from one of the channel waiting to be compared
+       // against tuple from the other channel
+       list<host_tuple> merge_queue;
+       int merge_qsize;
+
+       // comparator object used to compare the tuples
+       merge_functor func;
+
+       // soft limit on queue size - we consider operator blocked on its input
+       // whenever we reach this soft limit (not used anymore)
+       size_t soft_queue_size_limit;
+
+       int dropped_cnt;
+
+       // memory footprint for the merge queue in bytes
+       unsigned int queue_mem;
+
+       // appends tuple to the end of the merge_queue
+       // if tuple is stack resident, makes it heap resident
+       void append_tuple(host_tuple& tup) {
+               if (!tup.heap_resident) {
+                       char* data = (char*)malloc(tup.tuple_size);
+                       memcpy(data, tup.data, tup.tuple_size);
+                       tup.data = data;
+                       tup.heap_resident = true;
+               }
+               merge_queue.push_back(tup);
+               merge_qsize++;
+               queue_mem += tup.tuple_size;
+       }
+
+       void purge_queue(int channel, list<host_tuple>& result){
+               if (merge_queue.empty())
+                       return;
+               host_tuple top_tuple = merge_queue.front();
+               // remove all the tuple smaller than arrived tuple
+               while(func.compare_stored_with_temp_status(top_tuple, 1-channel) <= 0) {
+                       merge_queue.pop_front();
+                       merge_qsize--;
+                       queue_mem -= top_tuple.tuple_size;
+                       if(merge_qsize<0) abort();
+                       func.update_stored_temp_status(top_tuple,channel);
+                       func.xform_tuple(top_tuple);
+                       top_tuple.channel = output_channel;
+                       result.push_back(top_tuple);
+
+                       if (merge_queue.empty())
+                               break;
+
+                       top_tuple = merge_queue.front();
+                       func.update_stored_temp_status(top_tuple,channel);
+               }
+       }
+
+
+
+public:
+
+       merge_operator(int schema_handle1, int schema_handle2, size_t size_limit, const char* name ) : base_operator(name), func(schema_handle1) {
+               wait_channel = -1;
+               soft_queue_size_limit = size_limit;
+               merge_qsize = 0;
+               dropped_cnt = 0;
+               queue_mem = 0;
+       }
+
+
+       int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+               int last_channel;
+
+               if (tup.channel > 1) {
+                       fprintf(stderr, "Illegal channel number %d for two-way merge\n", tup.channel);
+                       return 0;
+               }
+
+               /* reject "out of order" tuple - we can receive those after forced flush */
+               func.get_timestamp(tup);
+               int res = func.compare_with_temp_status(tup.channel);
+               bool is_temp_tuple = func.temp_status_received(tup);
+
+/*
+//                     Ignore temp tuples until we can fix their timestamps.
+if(is_temp_tuple){
+ tup.free_tuple();
+ return 0;
+}*/
+               if (res < 0){   // out of order tuple
+
+                       if (++dropped_cnt % 100000 == 0) {
+                               gslog(LOG_ALERT, "%d tuples dropped by %s  merge\n", dropped_cnt,get_name());
+                       }
+                       //if(func.print_warnings())
+                       //      fprintf(stderr,"Warning: merge %s receives an out-of-order tuple on channel %d.\n", get_name(), tup.channel);
+                       // free tuple memory
+                       tup.free_tuple();
+                       return 0;
+               } else {
+
+                       if (wait_channel < 0 || wait_channel != tup.channel) {
+                               if (wait_channel < 0)
+                                       func.update_temp_status(tup);
+
+                               if (!is_temp_tuple){
+                                       if(func.compare_with_temp_status(1-tup.channel) <= 0){
+                                               if (!tup.heap_resident) {
+                                                       char* data = (char*)malloc(tup.tuple_size);
+                                                       memcpy(data, tup.data, tup.tuple_size);
+                                                       tup.data = data;
+                                                       tup.heap_resident = true;
+                                               }
+                                               func.xform_tuple(tup);
+                                               tup.channel = output_channel;
+                                               result.push_back(tup);
+                                       }else{
+                                               append_tuple(tup);              // put arrived tuple in the queue
+                                       }
+                               }
+//                             func.update_temp_status_by_slack(tup, 1-tup.channel);
+//                             purge_queue(tup.channel, result);
+
+                               wait_channel = 1-tup.channel;
+                       }else{
+       //                              If possible, clear tuples from the other queue.
+                               func.update_temp_status(tup);
+                               purge_queue(1-tup.channel, result);
+
+                               if(func.compare_with_temp_status(1-tup.channel) <= 0) { // other tuples in the queue are larger that arrived tuple
+                                       if (!is_temp_tuple) {
+                                               if (!tup.heap_resident) {
+                                                        char* data = (char*)malloc(tup.tuple_size);
+                                                        memcpy(data, tup.data, tup.tuple_size);
+                                                        tup.data = data;
+                                                        tup.heap_resident = true;
+                                                }
+                                               func.xform_tuple(tup);
+                                               tup.channel = output_channel;
+                                               result.push_back(tup);
+                                       }
+                               }
+                               else {
+                                       if (!is_temp_tuple)
+                                               append_tuple(tup);              // put arrived tuple in the queue
+                                       wait_channel = 1 - tup.channel; // now we want the tuple from other channel
+                               }
+                       }
+
+               }
+
+               // temp status tuples emited by merge don't serve any other purpose
+               // other than tracing the flow of tuples
+               if (is_temp_tuple) {
+                       host_tuple temp_tup;
+                       if (!func.create_temp_status_tuple(temp_tup)) {
+                               temp_tup.channel = output_channel;
+                               result.push_back(temp_tup);
+                       }
+                       // clear memory of heap-resident temporal tuples
+                       tup.free_tuple();
+               }
+
+               if (!merge_qsize)
+                       wait_channel = -1;
+
+               return 0;
+       }
+
+       int flush(list<host_tuple>& result) {
+
+               if (merge_queue.empty())
+                       return 0;
+
+               host_tuple top_tuple;
+               list<host_tuple>::iterator iter;
+               for (iter = merge_queue.begin(); iter != merge_queue.end(); iter++) {
+                       top_tuple = *iter;
+                       func.update_stored_temp_status(top_tuple,top_tuple.channel);
+                       func.xform_tuple(top_tuple);
+                       top_tuple.channel = output_channel;
+                       result.push_back(top_tuple);
+               }
+
+               queue_mem = 0;
+
+               return 0;
+       }
+
+       int set_param_block(int sz, void * value) {
+                       return 0;
+       }
+
+
+       int get_temp_status(host_tuple& result) {
+               result.channel = output_channel;
+               return func.create_temp_status_tuple(result);
+       }
+
+
+       int get_blocked_status () {
+               if (merge_qsize> soft_queue_size_limit)
+                       return wait_channel;
+               else
+                       return -1;
+       }
+
+       unsigned int get_mem_footprint() {
+               return queue_mem;
+       }
+
+};
+
+#endif // MERGE_OPERATOR_H