Fixed newline characters throughout the code
[com/gs-lite.git] / include / hfta / file_output_operator.h
index 065f6c4..a446dc6 100644 (file)
-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
-   Licensed under the Apache License, Version 2.0 (the "License");\r
-   you may not use this file except in compliance with the License.\r
-   You may obtain a copy of the License at\r
-\r
-     http://www.apache.org/licenses/LICENSE-2.0\r
-\r
-   Unless required by applicable law or agreed to in writing, software\r
-   distributed under the License is distributed on an "AS IS" BASIS,\r
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-   See the License for the specific language governing permissions and\r
-   limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#ifndef FILE_OUTPUT_OPERATOR_H\r
-#define FILE_OUTPUT_OPERATOR_H\r
-\r
-#include <app.h>\r
-#include <stdlib.h>\r
-#include <string.h>\r
-#include <stdio.h>\r
-#include <unistd.h>\r
-#include <signal.h>\r
-\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include <schemaparser.h>\r
-\r
-#include "host_tuple.h"\r
-#include "base_operator.h"\r
-#include <list>\r
-#include<string>\r
-#include<vector>\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include <schemaparser.h>\r
-\r
-using namespace std;\r
-\r
-#ifndef FILESTREAM_OPERATOR_UTILS\r
-#define FILESTREAM_OPERATOR_UTILS\r
-string fs_int_to_string(int i){\r
-    string ret;\r
-    char tmpstr[100];\r
-    sprintf(tmpstr,"%d",i);\r
-    ret=tmpstr;\r
-    return(ret);\r
-}\r
-#endif\r
-\r
-template <class file_output_functor> class file_output_operator :\r
-               public base_operator\r
-{\r
-private :\r
-       file_output_functor func;\r
-\r
-       FILE **output_file;\r
-       bool do_compression;\r
-       vector<string> xform_command;\r
-       vector<string> temp_fname;\r
-       vector<string> fname;\r
-       string qname;\r
-       string schema;\r
-       string file_header;\r
-       gs_uint32_t n_file_streams;\r
-\r
-int total_written;\r
-\r
-public:\r
-\r
-file_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {\r
-       char tmpstr[1000];\r
-       int i;\r
-\r
-       do_compression = func.do_compression();\r
-       n_file_streams = func.num_file_streams();\r
-       output_file = (FILE **)(malloc(sizeof(FILE *)*n_file_streams));\r
-       for(i=0;i<n_file_streams;++i){\r
-               output_file[i] = NULL;\r
-               xform_command.push_back("");\r
-               temp_fname.push_back("");\r
-               fname.push_back("");\r
-       }\r
-       qname = name;\r
-       schema = sch;\r
-       sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);\r
-       file_header = tmpstr;\r
-\r
-total_written = 0;\r
-}\r
-\r
-\r
-\r
-virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
-string fname_base;\r
-host_tuple res;\r
-gs_uint32_t i;\r
-\r
-       if(! func.temp_status_received(tup)){  // this operator doesn't write\r
-//                     Check for EOF conditions.\r
-               if(func.is_eof_tuple()){\r
-                       if(output_file[0] != NULL){\r
-                               for(i=0;i<n_file_streams;++i){\r
-                                       fclose(output_file[i]);\r
-                                       output_file[i] = NULL;\r
-                                       if(do_compression){\r
-                                               system(xform_command[i].c_str());\r
-                                       }\r
-                                       rename(temp_fname[i].c_str(), fname[i].c_str());\r
-                               }\r
-                       }\r
-                       if(func.propagate_tuple()){\r
-                               tup.channel = output_channel;\r
-                               result.push_back(tup);\r
-                       }\r
-                       return 0;\r
-               }\r
-\r
-               if(func.new_epoch()){\r
-                       if(output_file[0] != 0){\r
-total_written = 0;\r
-                               for(i=0;i<n_file_streams;++i){\r
-                                       fclose(output_file[i]);\r
-                                       if(do_compression){\r
-                                               system(xform_command[i].c_str());\r
-                                       }else{\r
-                                       rename(temp_fname[i].c_str(), fname[i].c_str());\r
-                                       }\r
-                               }\r
-                       }\r
-\r
-                       fname_base = func.get_filename_base();\r
-                       for(i=0;i<n_file_streams;++i){\r
-                               if(n_file_streams > 1){\r
-                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";\r
-                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";\r
-                               }else{\r
-                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";\r
-                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";\r
-                               }\r
-                               if (do_compression) {\r
-                                       xform_command[i] = "gzip -S .tmpgz "+temp_fname[i]+" ; mv "+temp_fname[i]+".tmpgz "+fname[i]+".gz";\r
-                               }\r
-//                     else{\r
-//                             xform_command = "mv "+temp_fname+" "+fname;\r
-//                     }\r
-                               if ((output_file[i]=fopen(temp_fname[i].c_str(),"w"))==0) {\r
-                                       gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());\r
-                                       exit(1);\r
-                               }\r
-                               fwrite(file_header.c_str(),strlen(file_header.c_str()),1,output_file[i]);\r
-                               fwrite(schema.c_str(),strlen(schema.c_str())+1,1,output_file[i]);\r
-                       }\r
-               }\r
-\r
-               gs_uint32_t sz = tup.tuple_size;\r
-               gs_uint32_t nsz = htonl(tup.tuple_size);\r
-               gs_uint32_t file_hash = func.output_hash();\r
-total_written += sizeof(gs_uint32_t) + sz;\r
-\r
-               if (fwrite(&nsz,sizeof(gs_uint32_t),1,output_file[file_hash])!=1) {\r
-                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());\r
-                       exit(1);\r
-               }\r
-\r
-               if (fwrite(tup.data,sz,1,output_file[file_hash])!=1) {\r
-                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sz, temp_fname[file_hash].c_str());\r
-                       exit(1);\r
-               }\r
-               if(func.propagate_tuple()){\r
-                       tup.channel = output_channel;\r
-                       result.push_back(tup);\r
-               }\r
-               return 0;\r
-       }else{\r
-/*\r
-               if (!func.create_temp_status_tuple(res)) {\r
-                       res.channel = output_channel;\r
-                       result.push_back(res);\r
-               }\r
-*/\r
-\r
-//fprintf(stderr,"\tis temp tuple\n");\r
-       }\r
-\r
-       if(func.propagate_tuple()){\r
-               tup.channel = output_channel;\r
-               result.push_back(tup);\r
-       }\r
-       return 0;\r
-}\r
-\r
-\r
-       virtual int flush(list<host_tuple>& result) {\r
-               return 0;\r
-       }\r
-\r
-       virtual int set_param_block(int sz, void * value) {\r
-//             func.set_param_block(sz, value);\r
-               return 0;\r
-       }\r
-\r
-       virtual int get_temp_status(host_tuple& result) {\r
-               result.channel = output_channel;\r
-               return func.create_temp_status_tuple(result);\r
-       }\r
-\r
-       virtual int get_blocked_status () {\r
-               return -1;              // selection operators are not blocked on any input\r
-       }\r
-};\r
-\r
-\r
-#endif // FILE_OUTPUT_OPERATOR_H\r
-\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ ------------------------------------------- */
+
+#ifndef FILE_OUTPUT_OPERATOR_H
+#define FILE_OUTPUT_OPERATOR_H
+
+#include <app.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include "gsconfig.h"
+#include "gstypes.h"
+#include <schemaparser.h>
+
+#include "host_tuple.h"
+#include "base_operator.h"
+#include <list>
+#include<string>
+#include<vector>
+#include "gsconfig.h"
+#include "gstypes.h"
+#include <schemaparser.h>
+
+using namespace std;
+
+#ifndef FILESTREAM_OPERATOR_UTILS
+#define FILESTREAM_OPERATOR_UTILS
+string fs_int_to_string(int i){
+    string ret;
+    char tmpstr[100];
+    sprintf(tmpstr,"%d",i);
+    ret=tmpstr;
+    return(ret);
+}
+#endif
+
+template <class file_output_functor> class file_output_operator :
+               public base_operator
+{
+private :
+       file_output_functor func;
+
+       FILE **output_file;
+       bool do_compression;
+       vector<string> xform_command;
+       vector<string> temp_fname;
+       vector<string> fname;
+       string qname;
+       string schema;
+       string file_header;
+       gs_uint32_t n_file_streams;
+
+int total_written;
+
+public:
+
+file_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {
+       char tmpstr[1000];
+       int i;
+
+       do_compression = func.do_compression();
+       n_file_streams = func.num_file_streams();
+       output_file = (FILE **)(malloc(sizeof(FILE *)*n_file_streams));
+       for(i=0;i<n_file_streams;++i){
+               output_file[i] = NULL;
+               xform_command.push_back("");
+               temp_fname.push_back("");
+               fname.push_back("");
+       }
+       qname = name;
+       schema = sch;
+       sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);
+       file_header = tmpstr;
+
+total_written = 0;
+}
+
+
+
+virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
+string fname_base;
+host_tuple res;
+gs_uint32_t i;
+
+       if(! func.temp_status_received(tup)){  // this operator doesn't write
+//                     Check for EOF conditions.
+               if(func.is_eof_tuple()){
+                       if(output_file[0] != NULL){
+                               for(i=0;i<n_file_streams;++i){
+                                       fclose(output_file[i]);
+                                       output_file[i] = NULL;
+                                       if(do_compression){
+                                               system(xform_command[i].c_str());
+                                       }
+                                       rename(temp_fname[i].c_str(), fname[i].c_str());
+                               }
+                       }
+                       if(func.propagate_tuple()){
+                               tup.channel = output_channel;
+                               result.push_back(tup);
+                       }
+                       return 0;
+               }
+
+               if(func.new_epoch()){
+                       if(output_file[0] != 0){
+total_written = 0;
+                               for(i=0;i<n_file_streams;++i){
+                                       fclose(output_file[i]);
+                                       if(do_compression){
+                                               system(xform_command[i].c_str());
+                                       }else{
+                                       rename(temp_fname[i].c_str(), fname[i].c_str());
+                                       }
+                               }
+                       }
+
+                       fname_base = func.get_filename_base();
+                       for(i=0;i<n_file_streams;++i){
+                               if(n_file_streams > 1){
+                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
+                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";
+                               }else{
+                                       temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
+                                       fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";
+                               }
+                               if (do_compression) {
+                                       xform_command[i] = "gzip -S .tmpgz "+temp_fname[i]+" ; mv "+temp_fname[i]+".tmpgz "+fname[i]+".gz";
+                               }
+//                     else{
+//                             xform_command = "mv "+temp_fname+" "+fname;
+//                     }
+                               if ((output_file[i]=fopen(temp_fname[i].c_str(),"w"))==0) {
+                                       gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());
+                                       exit(1);
+                               }
+                               fwrite(file_header.c_str(),strlen(file_header.c_str()),1,output_file[i]);
+                               fwrite(schema.c_str(),strlen(schema.c_str())+1,1,output_file[i]);
+                       }
+               }
+
+               gs_uint32_t sz = tup.tuple_size;
+               gs_uint32_t nsz = htonl(tup.tuple_size);
+               gs_uint32_t file_hash = func.output_hash();
+total_written += sizeof(gs_uint32_t) + sz;
+
+               if (fwrite(&nsz,sizeof(gs_uint32_t),1,output_file[file_hash])!=1) {
+                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());
+                       exit(1);
+               }
+
+               if (fwrite(tup.data,sz,1,output_file[file_hash])!=1) {
+                       gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sz, temp_fname[file_hash].c_str());
+                       exit(1);
+               }
+               if(func.propagate_tuple()){
+                       tup.channel = output_channel;
+                       result.push_back(tup);
+               }
+               return 0;
+       }else{
+/*
+               if (!func.create_temp_status_tuple(res)) {
+                       res.channel = output_channel;
+                       result.push_back(res);
+               }
+*/
+
+//fprintf(stderr,"\tis temp tuple\n");
+       }
+
+       if(func.propagate_tuple()){
+               tup.channel = output_channel;
+               result.push_back(tup);
+       }
+       return 0;
+}
+
+
+       virtual int flush(list<host_tuple>& result) {
+               return 0;
+       }
+
+       virtual int set_param_block(int sz, void * value) {
+//             func.set_param_block(sz, value);
+               return 0;
+       }
+
+       virtual int get_temp_status(host_tuple& result) {
+               result.channel = output_channel;
+               return func.create_temp_status_tuple(result);
+       }
+
+       virtual int get_blocked_status () {
+               return -1;              // selection operators are not blocked on any input
+       }
+};
+
+
+#endif // FILE_OUTPUT_OPERATOR_H
+