Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / include / hfta / zfile_output_operator.h
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #ifndef ZFILE_OUTPUT_OPERATOR_H
17 #define ZFILE_OUTPUT_OPERATOR_H
18
19 #include <app.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <stdio.h>
23 #include <unistd.h>
24 #include <signal.h>
25
26 #include "gsconfig.h"
27 #include "gstypes.h"
28 #include <schemaparser.h>
29
30 #include "host_tuple.h"
31 #include "base_operator.h"
32 #include <list>
33 #include<string>
34 #include<vector>
35 #include "gsconfig.h"
36 #include "gstypes.h"
37 #include <schemaparser.h>
38
39 #include <zlib.h>
40
41 using namespace std;
42
43 #ifndef FILESTREAM_OPERATOR_UTILS
44 #define FILESTREAM_OPERATOR_UTILS
45 string fs_int_to_string(int i){
46     string ret;
47     char tmpstr[100];
48     sprintf(tmpstr,"%d",i);
49     ret=tmpstr;
50     return(ret);
51 }
52 #endif
53
54 template <class file_output_functor> class zfile_output_operator :
55                 public base_operator
56 {
57 private :
58         file_output_functor func;
59
60         gzFile *output_file;
61         vector<string> xform_command;
62         vector<string> temp_fname;
63         vector<string> fname;
64         string qname;
65         string schema;
66         string file_header;
67         gs_uint32_t n_file_streams;
68
69 int total_written;
70
71 public:
72
73 zfile_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {
74         char tmpstr[1000];
75         int i;
76
77         n_file_streams = func.num_file_streams();
78         output_file = (gzFile *)(malloc(sizeof(gzFile )*n_file_streams));
79         for(i=0;i<n_file_streams;++i){
80                 output_file[i] = NULL;
81                 xform_command.push_back("");
82                 temp_fname.push_back("");
83                 fname.push_back("");
84         }
85         qname = name;
86         schema = sch;
87         sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);
88         file_header = tmpstr;
89
90 total_written = 0;
91 }
92
93
94
95 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {
96 string fname_base;
97 host_tuple res;
98 gs_uint32_t i;
99
100         if(! func.temp_status_received(tup)){  // this operator doesn't write
101 //                      Check for EOF conditions.
102                 if(func.is_eof_tuple()){
103                         if(output_file[0] != NULL){
104                                 for(i=0;i<n_file_streams;++i){
105                                         gzclose(output_file[i]);
106                                         output_file[i] = NULL;
107                                         rename(temp_fname[i].c_str(), fname[i].c_str());
108                                 }
109                         }
110                         if(func.propagate_tuple()){
111                                 tup.channel = output_channel;
112                                 result.push_back(tup);
113                         }
114                         return 0;
115                 }
116
117                 if(func.new_epoch()){
118                         if(output_file[0] != 0){
119 total_written = 0;
120                                 for(i=0;i<n_file_streams;++i){
121                                         gzclose(output_file[i]);
122                                         rename(temp_fname[i].c_str(), fname[i].c_str());
123                                 }
124                         }
125
126                         fname_base = func.get_filename_base();
127                         for(i=0;i<n_file_streams;++i){
128                                 if(n_file_streams > 1){
129                                         temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.gz.tmp";
130                                         fname[i] = fname_base + "_stream"+fs_int_to_string(i) + ".txt.gz";
131                                 }else{
132                                         temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.gz.tmp";
133                                         fname[i] = fname_base + "_stream"+fs_int_to_string(i) + ".txt.gz";
134                                 }
135 //                      else{
136 //                              xform_command = "mv "+temp_fname+" "+fname;
137 //                      }
138                                 if ((output_file[i]=gzopen(temp_fname[i].c_str(),"wb"))==0) {
139                                         gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());
140                                         exit(1);
141                                 }
142                                 gzwrite(output_file[i],file_header.c_str(),strlen(file_header.c_str()));
143                                 gzwrite(output_file[i],schema.c_str(),strlen(schema.c_str())+1);
144                         }
145                 }
146
147                 gs_uint32_t sz = tup.tuple_size;
148                 gs_uint32_t nsz = htonl(tup.tuple_size);
149                 gs_uint32_t file_hash = func.output_hash();
150 total_written += sizeof(gs_uint32_t) + sz;
151                 if (gzwrite(output_file[file_hash],&nsz,sizeof(gs_uint32_t))==0) {
152                         gslog(LOG_EMERG,"zfile_output_operator: Could not write %d bytes to output\"%s\".. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());
153                         exit(1);
154                 }
155
156                 if (gzwrite(output_file[file_hash],tup.data,sz)==0) {
157                         gslog(LOG_EMERG,"zfile_output_operator: Could not write %d bytes to output\"%s\".. EXITING\n", sz, temp_fname[file_hash].c_str());
158                         exit(1);
159                 }
160                 if(func.propagate_tuple()){
161                         tup.channel = output_channel;
162                         result.push_back(tup);
163                 }
164                 return 0;
165         }else{
166 /*
167                 if (!func.create_temp_status_tuple(res)) {
168                         res.channel = output_channel;
169                         result.push_back(res);
170                 }
171 */
172
173 //fprintf(stderr,"\tis temp tuple\n");
174         }
175
176         if(func.propagate_tuple()){
177                 tup.channel = output_channel;
178                 result.push_back(tup);
179         }
180         return 0;
181 }
182
183
184         virtual int flush(list<host_tuple>& result) {
185                 return 0;
186         }
187
188         virtual int set_param_block(int sz, void * value) {
189 //              func.set_param_block(sz, value);
190                 return 0;
191         }
192
193         virtual int get_temp_status(host_tuple& result) {
194                 result.channel = output_channel;
195                 return func.create_temp_status_tuple(result);
196         }
197
198         virtual int get_blocked_status () {
199                 return -1;              // selection operators are not blocked on any input
200         }
201 };
202
203
204 #endif  // ZFILE_OUTPUT_OPERATOR_H
205