1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #ifndef ZFILE_OUTPUT_OPERATOR_H
17 #define ZFILE_OUTPUT_OPERATOR_H
28 #include <schemaparser.h>
30 #include "host_tuple.h"
31 #include "base_operator.h"
37 #include <schemaparser.h>
43 #ifndef FILESTREAM_OPERATOR_UTILS
44 #define FILESTREAM_OPERATOR_UTILS
45 string fs_int_to_string(int i){
48 sprintf(tmpstr,"%d",i);
54 template <class file_output_functor> class zfile_output_operator :
58 file_output_functor func;
61 vector<string> xform_command;
62 vector<string> temp_fname;
67 gs_uint32_t n_file_streams;
73 zfile_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {
77 n_file_streams = func.num_file_streams();
78 output_file = (gzFile *)(malloc(sizeof(gzFile )*n_file_streams));
79 for(i=0;i<n_file_streams;++i){
80 output_file[i] = NULL;
81 xform_command.push_back("");
82 temp_fname.push_back("");
87 sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);
95 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
100 if(! func.temp_status_received(tup)){ // this operator doesn't write
101 // Check for EOF conditions.
102 if(func.is_eof_tuple()){
103 if(output_file[0] != NULL){
104 for(i=0;i<n_file_streams;++i){
105 gzclose(output_file[i]);
106 output_file[i] = NULL;
107 rename(temp_fname[i].c_str(), fname[i].c_str());
110 if(func.propagate_tuple()){
111 tup.channel = output_channel;
112 result.push_back(tup);
117 if(func.new_epoch()){
118 if(output_file[0] != 0){
120 for(i=0;i<n_file_streams;++i){
121 gzclose(output_file[i]);
122 rename(temp_fname[i].c_str(), fname[i].c_str());
126 fname_base = func.get_filename_base();
127 for(i=0;i<n_file_streams;++i){
128 if(n_file_streams > 1){
129 temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.gz.tmp";
130 fname[i] = fname_base + "_stream"+fs_int_to_string(i) + ".txt.gz";
132 temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.gz.tmp";
133 fname[i] = fname_base + "_stream"+fs_int_to_string(i) + ".txt.gz";
136 // xform_command = "mv "+temp_fname+" "+fname;
138 if ((output_file[i]=gzopen(temp_fname[i].c_str(),"wb"))==0) {
139 gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());
142 gzwrite(output_file[i],file_header.c_str(),strlen(file_header.c_str()));
143 gzwrite(output_file[i],schema.c_str(),strlen(schema.c_str())+1);
147 gs_uint32_t sz = tup.tuple_size;
148 gs_uint32_t nsz = htonl(tup.tuple_size);
149 gs_uint32_t file_hash = func.output_hash();
150 total_written += sizeof(gs_uint32_t) + sz;
151 if (gzwrite(output_file[file_hash],&nsz,sizeof(gs_uint32_t))==0) {
152 gslog(LOG_EMERG,"zfile_output_operator: Could not write %d bytes to output\"%s\".. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());
156 if (gzwrite(output_file[file_hash],tup.data,sz)==0) {
157 gslog(LOG_EMERG,"zfile_output_operator: Could not write %d bytes to output\"%s\".. EXITING\n", sz, temp_fname[file_hash].c_str());
160 if(func.propagate_tuple()){
161 tup.channel = output_channel;
162 result.push_back(tup);
167 if (!func.create_temp_status_tuple(res)) {
168 res.channel = output_channel;
169 result.push_back(res);
173 //fprintf(stderr,"\tis temp tuple\n");
176 if(func.propagate_tuple()){
177 tup.channel = output_channel;
178 result.push_back(tup);
184 virtual int flush(list<host_tuple>& result) {
188 virtual int set_param_block(int sz, void * value) {
189 // func.set_param_block(sz, value);
193 virtual int get_temp_status(host_tuple& result) {
194 result.channel = output_channel;
195 return func.create_temp_status_tuple(result);
198 virtual int get_blocked_status () {
199 return -1; // selection operators are not blocked on any input
204 #endif // ZFILE_OUTPUT_OPERATOR_H