Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / include / hfta / file_output_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef FILE_OUTPUT_OPERATOR_H
17 #define FILE_OUTPUT_OPERATOR_H
18
19 #include <app.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <stdio.h>
23 #include <unistd.h>
24 #include <signal.h>
25
26 #include "gsconfig.h"
27 #include "gstypes.h"
28 #include <schemaparser.h>
29
30 #include "host_tuple.h"
31 #include "base_operator.h"
32 #include <list>
33 #include<string>
34 #include<vector>
35 #include "gsconfig.h"
36 #include "gstypes.h"
37 #include <schemaparser.h>
38
39 using namespace std;
40
41 #ifndef FILESTREAM_OPERATOR_UTILS
42 #define FILESTREAM_OPERATOR_UTILS
43 string fs_int_to_string(int i){
44     string ret;
45     char tmpstr[100];
46     sprintf(tmpstr,"%d",i);
47     ret=tmpstr;
48     return(ret);
49 }
50 #endif
51
52 template <class file_output_functor> class file_output_operator :
53                 public base_operator
54 {
55 private :
56         file_output_functor func;
57
58         FILE **output_file;
59         bool do_compression;
60         vector<string> xform_command;
61         vector<string> temp_fname;
62         vector<string> fname;
63         string qname;
64         string schema;
65         string file_header;
66         gs_uint32_t n_file_streams;
67
68 int total_written;
69
70 public:
71
72 file_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {
73         char tmpstr[1000];
74         int i;
75
76         do_compression = func.do_compression();
77         n_file_streams = func.num_file_streams();
78         output_file = (FILE **)(malloc(sizeof(FILE *)*n_file_streams));
79         for(i=0;i<n_file_streams;++i){
80                 output_file[i] = NULL;
81                 xform_command.push_back("");
82                 temp_fname.push_back("");
83                 fname.push_back("");
84         }
85         qname = name;
86         schema = sch;
87         sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);
88         file_header = tmpstr;
89
90 total_written = 0;
91 }
92
93
94
95 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
96 string fname_base;
97 host_tuple res;
98 gs_uint32_t i;
99
100         if(! func.temp_status_received(tup)){  // this operator doesn't write
101 //                      Check for EOF conditions.
102                 if(func.is_eof_tuple()){
103                         if(output_file[0] != NULL){
104                                 for(i=0;i<n_file_streams;++i){
105                                         fclose(output_file[i]);
106                                         output_file[i] = NULL;
107                                         if(do_compression){
108                                                 system(xform_command[i].c_str());
109                                         }
110                                         rename(temp_fname[i].c_str(), fname[i].c_str());
111                                 }
112                         }
113                         if(func.propagate_tuple()){
114                                 tup.channel = output_channel;
115                                 result.push_back(tup);
116                         }
117                         return 0;
118                 }
119
120                 if(func.new_epoch()){
121                         if(output_file[0] != 0){
122 total_written = 0;
123                                 for(i=0;i<n_file_streams;++i){
124                                         fclose(output_file[i]);
125                                         if(do_compression){
126                                                 system(xform_command[i].c_str());
127                                         }else{
128                                         rename(temp_fname[i].c_str(), fname[i].c_str());
129                                         }
130                                 }
131                         }
132
133                         fname_base = func.get_filename_base();
134                         for(i=0;i<n_file_streams;++i){
135                                 if(n_file_streams > 1){
136                                         temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
137                                         fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";
138                                 }else{
139                                         temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.tmp";
140                                         fname[i] = fname_base  + "_stream"+fs_int_to_string(i)+ ".txt";
141                                 }
142                                 if (do_compression) {
143                                         xform_command[i] = "gzip -S .tmpgz "+temp_fname[i]+" ; mv "+temp_fname[i]+".tmpgz "+fname[i]+".gz";
144                                 }
145 //                      else{
146 //                              xform_command = "mv "+temp_fname+" "+fname;
147 //                      }
148                                 if ((output_file[i]=fopen(temp_fname[i].c_str(),"w"))==0) {
149                                         gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());
150                                         exit(1);
151                                 }
152                                 fwrite(file_header.c_str(),strlen(file_header.c_str()),1,output_file[i]);
153                                 fwrite(schema.c_str(),strlen(schema.c_str())+1,1,output_file[i]);
154                         }
155                 }
156
157                 gs_uint32_t sz = tup.tuple_size;
158                 gs_uint32_t nsz = htonl(tup.tuple_size);
159                 gs_uint32_t file_hash = func.output_hash();
160 total_written += sizeof(gs_uint32_t) + sz;
161
162                 if (fwrite(&nsz,sizeof(gs_uint32_t),1,output_file[file_hash])!=1) {
163                         gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());
164                         exit(1);
165                 }
166
167                 if (fwrite(tup.data,sz,1,output_file[file_hash])!=1) {
168                         gslog(LOG_EMERG,"file_output_operator: Could not write %d bytes to output \"%s\" .. EXITING\n", sz, temp_fname[file_hash].c_str());
169                         exit(1);
170                 }
171                 if(func.propagate_tuple()){
172                         tup.channel = output_channel;
173                         result.push_back(tup);
174                 }
175                 return 0;
176         }else{
177 /*
178                 if (!func.create_temp_status_tuple(res)) {
179                         res.channel = output_channel;
180                         result.push_back(res);
181                 }
182 */
183
184 //fprintf(stderr,"\tis temp tuple\n");
185         }
186
187         if(func.propagate_tuple()){
188                 tup.channel = output_channel;
189                 result.push_back(tup);
190         }
191         return 0;
192 }
193
194
195         virtual int flush(list<host_tuple>& result) {
196                 return 0;
197         }
198
199         virtual int set_param_block(int sz, void * value) {
200 //              func.set_param_block(sz, value);
201                 return 0;
202         }
203
204         virtual int get_temp_status(host_tuple& result) {
205                 result.channel = output_channel;
206                 return func.create_temp_status_tuple(result);
207         }
208
209         virtual int get_blocked_status () {
210                 return -1;              // selection operators are not blocked on any input
211         }
212 };
213
214
215 #endif  // FILE_OUTPUT_OPERATOR_H
216