Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / file_output_operator.h
index a446dc6..065f6c4 100644 (file)
-/* ------------------------------------------------
-Copyright 2014 AT&T Intellectual Property
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
- ------------------------------------------- */
-
-#ifndef FILE_OUTPUT_OPERATOR_H
-#define FILE_OUTPUT_OPERATOR_H
-
-#include <app.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <signal.h>
-
-#include "gsconfig.h"
-#include "gstypes.h"
-#include <schemaparser.h>
-
-#include "host_tuple.h"
-#include "base_operator.h"
-#include <list>
-#include<string>
-#include<vector>
-#include "gsconfig.h"
-#include "gstypes.h"
-#include <schemaparser.h>
-
-using namespace std;
-
-#ifndef FILESTREAM_OPERATOR_UTILS
-#define FILESTREAM_OPERATOR_UTILS
-string fs_int_to_string(int i){
-    string ret;
-    char tmpstr[100];
-    sprintf(tmpstr,"%d",i);
-    ret=tmpstr;
-    return(ret);
-}
-#endif
-
-template <class file_output_functor> class file_output_operator :
-               public base_operator
-{
-private :
-       file_output_functor func;
-
-       FILE **output_file;
-       bool do_compression;
-       vector<string> xform_command;
-       vector<string> temp_fname;
-       vector<string> fname;
-       string qname;
-       string schema;
-       string file_header;
-       gs_uint32_t n_file_streams;
-
-int total_written;
-
-public:
-
-file_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {
-       char tmpstr[1000];
-       int i;
-
-       do_compression = func.do_compression();
-       n_file_streams = func.num_file_streams();
-       output_file = (FILE **)(malloc(sizeof(FILE *)*n_file_streams));
-       for(i=0;i<n_file_streams;++i){
-               output_file[i] = NULL;
-               xform_command.push_back("");
-               temp_fname.push_back("");
-               fname.push_back("");
-       }
-       qname = name;
-       schema = sch;
-       sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);
-       file_header = tmpstr;
-
-total_written = 0;
-}
-
-
-
-virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
-string fname_base;
-host_tuple res;
-gs_uint32_t i;
-
-       if(! func.temp_status_received(tup)){  // this operator doesn't write
-//                     Check for EOF conditions.
-               if(func.is_eof_tuple()){
-                       if(output_file[0] != NULL){
-                               for(i=0;i<n_file_streams;++i){
-                                       fclose(output_file[i]);
-                                       output_file[i] = NULL;
-                                       if(do_compression){
-                                               system(xform_command[i].c_str());
-                                       }
-                                       rename(temp_fname[i].c_str(), fname[i].c_str());
-                               }
-                       }
-                       if(func.propagate_tuple()){
-                               tup.channel = output_channel;
-                               result.push_back(tup);
-                       }
-                       return 0;
-               }
-
-               if(func.new_epoch()){
-                       if(output_file[0] != 0){
-total_written = 0;
-                               for(i=0;i<n_file_streams;++i){
-                                       fclose(output_file[i]);
-                                       if(do_compression){
-                                               system(xform_command[i].c_str());
-                                       }else{
-                                       rename(temp_fname[i].c_str(), fname[i].c_str());
-                                       }
-                               }
-                       }
-
-                       fname_base = func.get_filename_base();
-                       for(i=0;i<n_file_streams;++i){
-                               if(n_file_streams > 1){
-                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
-                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";
-                               }else{
-                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
-                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";
-                               }
-                               if (do_compression) {
-                                       xform_command[i] = "gzip -S .tmpgz "+temp_fname[i]+" ; mv "+temp_fname[i]+".tmpgz "+fname[i]+".gz";
-                               }
-//                     else{
-//                             xform_command = "mv "+temp_fname+" "+fname;
-//                     }
-                               if ((output_file[i]=fopen(temp_fname[i].c_str(),"w"))==0) {
-                                       gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());
-                                       exit(1);
-                               }
-                               fwrite(file_header.c_str(),strlen(file_header.c_str()),1,output_file[i]);
-                               fwrite(schema.c_str(),strlen(schema.c_str())+1,1,output_file[i]);
-                       }
-               }
-
-               gs_uint32_t sz = tup.tuple_size;
-               gs_uint32_t nsz = htonl(tup.tuple_size);
-               gs_uint32_t file_hash = func.output_hash();
-total_written += sizeof(gs_uint32_t) + sz;
-
-               if (fwrite(&nsz,sizeof(gs_uint32_t),1,output_file[file_hash])!=1) {
-                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());
-                       exit(1);
-               }
-
-               if (fwrite(tup.data,sz,1,output_file[file_hash])!=1) {
-                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sz, temp_fname[file_hash].c_str());
-                       exit(1);
-               }
-               if(func.propagate_tuple()){
-                       tup.channel = output_channel;
-                       result.push_back(tup);
-               }
-               return 0;
-       }else{
-/*
-               if (!func.create_temp_status_tuple(res)) {
-                       res.channel = output_channel;
-                       result.push_back(res);
-               }
-*/
-
-//fprintf(stderr,"\tis temp tuple\n");
-       }
-
-       if(func.propagate_tuple()){
-               tup.channel = output_channel;
-               result.push_back(tup);
-       }
-       return 0;
-}
-
-
-       virtual int flush(list<host_tuple>& result) {
-               return 0;
-       }
-
-       virtual int set_param_block(int sz, void * value) {
-//             func.set_param_block(sz, value);
-               return 0;
-       }
-
-       virtual int get_temp_status(host_tuple& result) {
-               result.channel = output_channel;
-               return func.create_temp_status_tuple(result);
-       }
-
-       virtual int get_blocked_status () {
-               return -1;              // selection operators are not blocked on any input
-       }
-};
-
-
-#endif // FILE_OUTPUT_OPERATOR_H
-
+/* ------------------------------------------------\r
+Copyright 2014 AT&T Intellectual Property\r
+   Licensed under the Apache License, Version 2.0 (the "License");\r
+   you may not use this file except in compliance with the License.\r
+   You may obtain a copy of the License at\r
+\r
+     http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#ifndef FILE_OUTPUT_OPERATOR_H\r
+#define FILE_OUTPUT_OPERATOR_H\r
+\r
+#include <app.h>\r
+#include <stdlib.h>\r
+#include <string.h>\r
+#include <stdio.h>\r
+#include <unistd.h>\r
+#include <signal.h>\r
+\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include <schemaparser.h>\r
+\r
+#include "host_tuple.h"\r
+#include "base_operator.h"\r
+#include <list>\r
+#include<string>\r
+#include<vector>\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include <schemaparser.h>\r
+\r
+using namespace std;\r
+\r
+#ifndef FILESTREAM_OPERATOR_UTILS\r
+#define FILESTREAM_OPERATOR_UTILS\r
+string fs_int_to_string(int i){\r
+    string ret;\r
+    char tmpstr[100];\r
+    sprintf(tmpstr,"%d",i);\r
+    ret=tmpstr;\r
+    return(ret);\r
+}\r
+#endif\r
+\r
+template <class file_output_functor> class file_output_operator :\r
+               public base_operator\r
+{\r
+private :\r
+       file_output_functor func;\r
+\r
+       FILE **output_file;\r
+       bool do_compression;\r
+       vector<string> xform_command;\r
+       vector<string> temp_fname;\r
+       vector<string> fname;\r
+       string qname;\r
+       string schema;\r
+       string file_header;\r
+       gs_uint32_t n_file_streams;\r
+\r
+int total_written;\r
+\r
+public:\r
+\r
+file_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {\r
+       char tmpstr[1000];\r
+       int i;\r
+\r
+       do_compression = func.do_compression();\r
+       n_file_streams = func.num_file_streams();\r
+       output_file = (FILE **)(malloc(sizeof(FILE *)*n_file_streams));\r
+       for(i=0;i<n_file_streams;++i){\r
+               output_file[i] = NULL;\r
+               xform_command.push_back("");\r
+               temp_fname.push_back("");\r
+               fname.push_back("");\r
+       }\r
+       qname = name;\r
+       schema = sch;\r
+       sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);\r
+       file_header = tmpstr;\r
+\r
+total_written = 0;\r
+}\r
+\r
+\r
+\r
+virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
+string fname_base;\r
+host_tuple res;\r
+gs_uint32_t i;\r
+\r
+       if(! func.temp_status_received(tup)){  // this operator doesn't write\r
+//                     Check for EOF conditions.\r
+               if(func.is_eof_tuple()){\r
+                       if(output_file[0] != NULL){\r
+                               for(i=0;i<n_file_streams;++i){\r
+                                       fclose(output_file[i]);\r
+                                       output_file[i] = NULL;\r
+                                       if(do_compression){\r
+                                               system(xform_command[i].c_str());\r
+                                       }\r
+                                       rename(temp_fname[i].c_str(), fname[i].c_str());\r
+                               }\r
+                       }\r
+                       if(func.propagate_tuple()){\r
+                               tup.channel = output_channel;\r
+                               result.push_back(tup);\r
+                       }\r
+                       return 0;\r
+               }\r
+\r
+               if(func.new_epoch()){\r
+                       if(output_file[0] != 0){\r
+total_written = 0;\r
+                               for(i=0;i<n_file_streams;++i){\r
+                                       fclose(output_file[i]);\r
+                                       if(do_compression){\r
+                                               system(xform_command[i].c_str());\r
+                                       }else{\r
+                                       rename(temp_fname[i].c_str(), fname[i].c_str());\r
+                                       }\r
+                               }\r
+                       }\r
+\r
+                       fname_base = func.get_filename_base();\r
+                       for(i=0;i<n_file_streams;++i){\r
+                               if(n_file_streams > 1){\r
+                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";\r
+                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";\r
+                               }else{\r
+                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";\r
+                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";\r
+                               }\r
+                               if (do_compression) {\r
+                                       xform_command[i] = "gzip -S .tmpgz "+temp_fname[i]+" ; mv "+temp_fname[i]+".tmpgz "+fname[i]+".gz";\r
+                               }\r
+//                     else{\r
+//                             xform_command = "mv "+temp_fname+" "+fname;\r
+//                     }\r
+                               if ((output_file[i]=fopen(temp_fname[i].c_str(),"w"))==0) {\r
+                                       gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());\r
+                                       exit(1);\r
+                               }\r
+                               fwrite(file_header.c_str(),strlen(file_header.c_str()),1,output_file[i]);\r
+                               fwrite(schema.c_str(),strlen(schema.c_str())+1,1,output_file[i]);\r
+                       }\r
+               }\r
+\r
+               gs_uint32_t sz = tup.tuple_size;\r
+               gs_uint32_t nsz = htonl(tup.tuple_size);\r
+               gs_uint32_t file_hash = func.output_hash();\r
+total_written += sizeof(gs_uint32_t) + sz;\r
+\r
+               if (fwrite(&nsz,sizeof(gs_uint32_t),1,output_file[file_hash])!=1) {\r
+                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());\r
+                       exit(1);\r
+               }\r
+\r
+               if (fwrite(tup.data,sz,1,output_file[file_hash])!=1) {\r
+                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sz, temp_fname[file_hash].c_str());\r
+                       exit(1);\r
+               }\r
+               if(func.propagate_tuple()){\r
+                       tup.channel = output_channel;\r
+                       result.push_back(tup);\r
+               }\r
+               return 0;\r
+       }else{\r
+/*\r
+               if (!func.create_temp_status_tuple(res)) {\r
+                       res.channel = output_channel;\r
+                       result.push_back(res);\r
+               }\r
+*/\r
+\r
+//fprintf(stderr,"\tis temp tuple\n");\r
+       }\r
+\r
+       if(func.propagate_tuple()){\r
+               tup.channel = output_channel;\r
+               result.push_back(tup);\r
+       }\r
+       return 0;\r
+}\r
+\r
+\r
+       virtual int flush(list<host_tuple>& result) {\r
+               return 0;\r
+       }\r
+\r
+       virtual int set_param_block(int sz, void * value) {\r
+//             func.set_param_block(sz, value);\r
+               return 0;\r
+       }\r
+\r
+       virtual int get_temp_status(host_tuple& result) {\r
+               result.channel = output_channel;\r
+               return func.create_temp_status_tuple(result);\r
+       }\r
+\r
+       virtual int get_blocked_status () {\r
+               return -1;              // selection operators are not blocked on any input\r
+       }\r
+};\r
+\r
+\r
+#endif // FILE_OUTPUT_OPERATOR_H\r
+\r