1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef FILE_OUTPUT_OPERATOR_H
17 #define FILE_OUTPUT_OPERATOR_H
28 #include <schemaparser.h>
30 #include "host_tuple.h"
31 #include "base_operator.h"
37 #include <schemaparser.h>
41 #ifndef FILESTREAM_OPERATOR_UTILS
42 #define FILESTREAM_OPERATOR_UTILS
43 string fs_int_to_string(int i){
46 sprintf(tmpstr,"%d",i);
52 template <class file_output_functor> class file_output_operator :
56 file_output_functor func;
60 vector<string> xform_command;
61 vector<string> temp_fname;
66 gs_uint32_t n_file_streams;
72 file_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {
76 do_compression = func.do_compression();
77 n_file_streams = func.num_file_streams();
78 output_file = (FILE **)(malloc(sizeof(FILE *)*n_file_streams));
79 for(i=0;i<n_file_streams;++i){
80 output_file[i] = NULL;
81 xform_command.push_back("");
82 temp_fname.push_back("");
87 sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);
95 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
100 if(! func.temp_status_received(tup)){ // this operator doesn't write
101 // Check for EOF conditions.
102 if(func.is_eof_tuple()){
103 if(output_file[0] != NULL){
104 for(i=0;i<n_file_streams;++i){
105 fclose(output_file[i]);
106 output_file[i] = NULL;
108 system(xform_command[i].c_str());
110 rename(temp_fname[i].c_str(), fname[i].c_str());
113 if(func.propagate_tuple()){
114 tup.channel = output_channel;
115 result.push_back(tup);
120 if(func.new_epoch()){
121 if(output_file[0] != 0){
123 for(i=0;i<n_file_streams;++i){
124 fclose(output_file[i]);
126 system(xform_command[i].c_str());
128 rename(temp_fname[i].c_str(), fname[i].c_str());
133 fname_base = func.get_filename_base();
134 for(i=0;i<n_file_streams;++i){
135 if(n_file_streams > 1){
136 temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
137 fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt";
139 temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
140 fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt";
142 if (do_compression) {
143 xform_command[i] = "gzip -S .tmpgz "+temp_fname[i]+" ; mv "+temp_fname[i]+".tmpgz "+fname[i]+".gz";
146 // xform_command = "mv "+temp_fname+" "+fname;
148 if ((output_file[i]=fopen(temp_fname[i].c_str(),"w"))==0) {
149 gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());
152 fwrite(file_header.c_str(),strlen(file_header.c_str()),1,output_file[i]);
153 fwrite(schema.c_str(),strlen(schema.c_str())+1,1,output_file[i]);
157 gs_uint32_t sz = tup.tuple_size;
158 gs_uint32_t nsz = htonl(tup.tuple_size);
159 gs_uint32_t file_hash = func.output_hash();
160 total_written += sizeof(gs_uint32_t) + sz;
162 if (fwrite(&nsz,sizeof(gs_uint32_t),1,output_file[file_hash])!=1) {
163 gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());
167 if (fwrite(tup.data,sz,1,output_file[file_hash])!=1) {
168 gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sz, temp_fname[file_hash].c_str());
171 if(func.propagate_tuple()){
172 tup.channel = output_channel;
173 result.push_back(tup);
178 if (!func.create_temp_status_tuple(res)) {
179 res.channel = output_channel;
180 result.push_back(res);
184 //fprintf(stderr,"\tis temp tuple\n");
187 if(func.propagate_tuple()){
188 tup.channel = output_channel;
189 result.push_back(tup);
195 virtual int flush(list<host_tuple>& result) {
199 virtual int set_param_block(int sz, void * value) {
200 // func.set_param_block(sz, value);
204 virtual int get_temp_status(host_tuple& result) {
205 result.channel = output_channel;
206 return func.create_temp_status_tuple(result);
209 virtual int get_blocked_status () {
210 return -1; // selection operators are not blocked on any input
215 #endif // FILE_OUTPUT_OPERATOR_H