Added quantiling UDAFs
[com/gs-lite.git] / include / hfta / zfile_output_operator.h
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #ifndef ZFILE_OUTPUT_OPERATOR_H\r
17 #define ZFILE_OUTPUT_OPERATOR_H\r
18 \r
19 #include <app.h>\r
20 #include <stdlib.h>\r
21 #include <string.h>\r
22 #include <stdio.h>\r
23 #include <unistd.h>\r
24 #include <signal.h>\r
25 \r
26 #include "gsconfig.h"\r
27 #include "gstypes.h"\r
28 #include <schemaparser.h>\r
29 \r
30 #include "host_tuple.h"\r
31 #include "base_operator.h"\r
32 #include <list>\r
33 #include<string>\r
34 #include<vector>\r
35 #include "gsconfig.h"\r
36 #include "gstypes.h"\r
37 #include <schemaparser.h>\r
38 \r
39 #include <zlib.h>\r
40 \r
41 using namespace std;\r
42 \r
43 #ifndef FILESTREAM_OPERATOR_UTILS\r
44 #define FILESTREAM_OPERATOR_UTILS\r
45 string fs_int_to_string(int i){\r
46     string ret;\r
47     char tmpstr[100];\r
48     sprintf(tmpstr,"%d",i);\r
49     ret=tmpstr;\r
50     return(ret);\r
51 }\r
52 #endif\r
53 \r
54 template <class file_output_functor> class zfile_output_operator :\r
55                 public base_operator\r
56 {\r
57 private :\r
58         file_output_functor func;\r
59 \r
60         gzFile *output_file;\r
61         vector<string> xform_command;\r
62         vector<string> temp_fname;\r
63         vector<string> fname;\r
64         string qname;\r
65         string schema;\r
66         string file_header;\r
67         gs_uint32_t n_file_streams;\r
68 \r
69 int total_written;\r
70 \r
71 public:\r
72 \r
73 zfile_output_operator(int schema_handle, gs_csp_t name, gs_csp_t sch) : base_operator(name), func(schema_handle) {\r
74         char tmpstr[1000];\r
75         int i;\r
76 \r
77         n_file_streams = func.num_file_streams();\r
78         output_file = (gzFile *)(malloc(sizeof(gzFile )*n_file_streams));\r
79         for(i=0;i<n_file_streams;++i){\r
80                 output_file[i] = NULL;\r
81                 xform_command.push_back("");\r
82                 temp_fname.push_back("");\r
83                 fname.push_back("");\r
84         }\r
85         qname = name;\r
86         schema = sch;\r
87         sprintf(tmpstr,"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",get_schemaparser_version(),strlen(sch)+1);\r
88         file_header = tmpstr;\r
89 \r
90 total_written = 0;\r
91 }\r
92 \r
93 \r
94 \r
95 virtual int accept_tuple(host_tuple& tup, list<host_tuple>& result) {\r
96 string fname_base;\r
97 host_tuple res;\r
98 gs_uint32_t i;\r
99 \r
100         if(! func.temp_status_received(tup)){  // this operator doesn't write\r
101 //                      Check for EOF conditions.\r
102                 if(func.is_eof_tuple()){\r
103                         if(output_file[0] != NULL){\r
104                                 for(i=0;i<n_file_streams;++i){\r
105                                         gzclose(output_file[i]);\r
106                                         output_file[i] = NULL;\r
107                                         rename(temp_fname[i].c_str(), fname[i].c_str());\r
108                                 }\r
109                         }\r
110                         if(func.propagate_tuple()){\r
111                                 tup.channel = output_channel;\r
112                                 result.push_back(tup);\r
113                         }\r
114                         return 0;\r
115                 }\r
116 \r
117                 if(func.new_epoch()){\r
118                         if(output_file[0] != 0){\r
119 total_written = 0;\r
120                                 for(i=0;i<n_file_streams;++i){\r
121                                         gzclose(output_file[i]);\r
122                                         rename(temp_fname[i].c_str(), fname[i].c_str());\r
123                                 }\r
124                         }\r
125 \r
126                         fname_base = func.get_filename_base();\r
127                         for(i=0;i<n_file_streams;++i){\r
128                                 if(n_file_streams > 1){\r
129                                         temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.gz.tmp";\r
130                                         fname[i] = fname_base + "_stream"+fs_int_to_string(i) + ".txt.gz";\r
131                                 }else{\r
132                                         temp_fname[i] = fname_base + "_stream"+fs_int_to_string(i)+ ".txt.gz.tmp";\r
133                                         fname[i] = fname_base + "_stream"+fs_int_to_string(i) + ".txt.gz";\r
134                                 }\r
135 //                      else{\r
136 //                              xform_command = "mv "+temp_fname+" "+fname;\r
137 //                      }\r
138                                 if ((output_file[i]=gzopen(temp_fname[i].c_str(),"wb"))==0) {\r
139                                         gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", temp_fname[i].c_str());\r
140                                         exit(1);\r
141                                 }\r
142                                 gzwrite(output_file[i],file_header.c_str(),strlen(file_header.c_str()));\r
143                                 gzwrite(output_file[i],schema.c_str(),strlen(schema.c_str())+1);\r
144                         }\r
145                 }\r
146 \r
147                 gs_uint32_t sz = tup.tuple_size;\r
148                 gs_uint32_t nsz = htonl(tup.tuple_size);\r
149                 gs_uint32_t file_hash = func.output_hash();\r
150 total_written += sizeof(gs_uint32_t) + sz;\r
151                 if (gzwrite(output_file[file_hash],&nsz,sizeof(gs_uint32_t))==0) {\r
152                         gslog(LOG_EMERG,"zfile_output_operator: Could not write %d bytes to output\"%s\".. EXITING\n", sizeof(gs_uint32_t), temp_fname[file_hash].c_str());\r
153                         exit(1);\r
154                 }\r
155 \r
156                 if (gzwrite(output_file[file_hash],tup.data,sz)==0) {\r
157                         gslog(LOG_EMERG,"zfile_output_operator: Could not write %d bytes to output\"%s\".. EXITING\n", sz, temp_fname[file_hash].c_str());\r
158                         exit(1);\r
159                 }\r
160                 if(func.propagate_tuple()){\r
161                         tup.channel = output_channel;\r
162                         result.push_back(tup);\r
163                 }\r
164                 return 0;\r
165         }else{\r
166 /*\r
167                 if (!func.create_temp_status_tuple(res)) {\r
168                         res.channel = output_channel;\r
169                         result.push_back(res);\r
170                 }\r
171 */\r
172 \r
173 //fprintf(stderr,"\tis temp tuple\n");\r
174         }\r
175 \r
176         if(func.propagate_tuple()){\r
177                 tup.channel = output_channel;\r
178                 result.push_back(tup);\r
179         }\r
180         return 0;\r
181 }\r
182 \r
183 \r
184         virtual int flush(list<host_tuple>& result) {\r
185                 return 0;\r
186         }\r
187 \r
188         virtual int set_param_block(int sz, void * value) {\r
189 //              func.set_param_block(sz, value);\r
190                 return 0;\r
191         }\r
192 \r
193         virtual int get_temp_status(host_tuple& result) {\r
194                 result.channel = output_channel;\r
195                 return func.create_temp_status_tuple(result);\r
196         }\r
197 \r
198         virtual int get_blocked_status () {\r
199                 return -1;              // selection operators are not blocked on any input\r
200         }\r
201 };\r
202 \r
203 \r
204 #endif  // ZFILE_OUTPUT_OPERATOR_H\r
205 \r