1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
38 #include "schemaparser.h"
42 void fta_init(gs_sp_t device);
43 void rts_fta_process_packet(struct packet * p);
49 #define CSVMAXLINE 1000000
52 static gs_uint8_t in[CHUNK + CSVMAXLINE];
53 static gs_uint8_t out[CHUNK + CSVMAXLINE];
55 #define FILEWAIT_TIMEOUT 10000 // timeout value for getting next file (in microseconds)
57 gs_uint32_t max_field_csv = CSVELEMENTS;
62 #include "bsa_stream.hpp"
63 #include "bsa_util.hpp"
64 BSA::FileStream::ISubStream* stream;
65 BSA::FileStream::IFileHandle* ifh;
66 BSA::FileStream::Reader* reader;
71 static int listensockfd=-1;
73 static struct packet cur_packet;
77 static size_t line_len;
78 static gs_uint32_t lineend=0;
79 static gs_uint8_t csvdel = ',';
80 static gs_uint32_t verbose=0;
81 static gs_uint32_t startupdelay=0;
82 static gs_uint32_t singlefile=0;
83 static gs_uint32_t use_gzip=0;
84 static gs_uint32_t use_bsa=0;
85 static gs_uint32_t gshub=0;
86 static int socket_desc=0;
88 #include "lfta/csv_parser.h"
90 // leftover bytes not consumed at the end of the data chunk
91 gs_uint32_t leftover = 0;
93 uint64_t get_posix_clock_time ()
97 if (clock_gettime (CLOCK_MONOTONIC, &ts) == 0)
98 return (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
104 static void init_inflate() {
107 /* allocate inflate state */
108 strm.zalloc = Z_NULL;
110 strm.opaque = Z_NULL;
112 strm.next_in = Z_NULL;
113 ret = inflateInit2(&strm, 15 /* window bits */ | 32 /* use gzip */);
115 print_error((gs_sp_t)"csv::inflateInit2");
120 static void csv_replay_check_messages() {
121 if (fta_start_service(0)<0) {
122 print_error((gs_sp_t)"Error:in processing the msg queue for a replay file");
127 static gs_int32_t read_chunk_socket(gs_sp_t buffer, gs_uint32_t length){
131 struct timeval socket_timeout;
134 FD_ZERO(&socket_rset);
135 FD_SET(socket_desc,&socket_rset);
136 FD_ZERO(&socket_eset);
137 FD_SET(socket_desc,&socket_eset);
138 // timeout in one millisecond
139 socket_timeout.tv_sec = 0;
140 socket_timeout.tv_usec = 1000;
142 if ((retval = select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
150 if ((r=read(socket_desc, buffer + leftover, length)) <= 0) {
151 print_error((gs_sp_t)"ERROR:could not read data from csv stream");
158 static void init_socket() {
161 struct sockaddr_in server;
162 gs_int32_t parserversion;
163 gs_uint32_t schemalen;
164 static gs_sp_t asciischema=0;
167 if (get_hub(&gshub)!=0) {
168 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
172 if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
173 print_error((gs_sp_t)"ERROR:could not find data source for stream\n");
177 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
178 if (socket_desc == -1)
180 print_error((gs_sp_t)"ERROR:could not create socket for data stream");
183 server.sin_addr.s_addr = srcinfo.ip;
184 server.sin_family = AF_INET;
185 server.sin_port = srcinfo.port;
187 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0) {
188 print_error((gs_sp_t)"ERROR: could not open connection to data source");
193 static void next_file() {
196 fprintf(stderr,"Opening %s\n",name);
198 if (singlefile == 0) {
199 while (lstat(name, &s) != 0) {
200 if (errno != ENOENT) {
201 print_error((gs_sp_t)"csv::lstat unexpected return value");
204 csv_replay_check_messages();
205 usleep(FILEWAIT_TIMEOUT);
211 if ((fd = open(name, O_RDONLY)) < 0) {
212 print_error((gs_sp_t)"csv::open failed ");
215 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
216 if (singlefile == 0) {
226 uint64_t bsa_file_start_time = 0;
227 uint64_t bsa_total_elapsed_time = 0;
229 static void next_file_bsa() {
232 if (bsa_file_start_time) {
233 bsa_total_elapsed_time += (get_posix_clock_time()- bsa_file_start_time);
234 bsa_file_start_time = 0;
237 ifh = stream->getNextFileHandle(FILEWAIT_TIMEOUT / 1000);
242 fprintf(stderr,"%s: Opening %s %s\n", dev, ifh->getHandle().c_str(), stream->getPositionHandle().c_str());
244 bsa_file_start_time = get_posix_clock_time();
245 reader = ifh->openFile();
252 static void close_file_bsa() {
269 static gs_retval_t csv_replay_init(gs_sp_t device)
275 gs_sp_t singlefiletmp;
276 gs_sp_t compressortmp;
279 if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
280 if (strncmp(verbosetmp,"TRUE",4)==0) {
282 fprintf(stderr,"VERBOSE ENABLED\n");
284 fprintf(stderr,"VERBOSE DISABLED\n");
288 if ((name=get_iface_properties(device,(gs_sp_t)"filename"))==0) {
289 print_error((gs_sp_t)"csv_replay_init::No CSV \"Filename\" defined");
292 tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
295 csv_set_delim(csvdel);
298 if ((singlefiletmp=get_iface_properties(device,(gs_sp_t)"singlefile"))!=0) {
299 if (strncmp(singlefiletmp,"TRUE",4)==0) {
302 fprintf(stderr,"SINGLEFILE ENABLED\n");
305 fprintf(stderr,"SINGLEFILE DISABLED\n");
309 if ((compressortmp=get_iface_properties(device,(gs_sp_t)"compressor"))!=0) {
310 if (strncmp(compressortmp,"GZIP",4)==0) {
313 fprintf(stderr,"USING ZLIP COMPRESSOR ENABLED\n");
315 print_error((gs_sp_t)"csv_replay_init::Unknown value for interface property \"Compressor\"");
320 if ((bsatmp=get_iface_properties(device,(gs_sp_t)"bsa"))!=0) {
321 if (strncmp(bsatmp,"TRUE",4)==0) {
323 print_error((gs_sp_t)"csv_replay_init::runtime not built with BSA support to use BSA interfaces");
329 fprintf(stderr,"USING BSA STREAMS\n");
333 if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
335 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
337 startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
339 if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
341 fprintf(stderr,"CSV format using gshub\n");
346 cur_packet.ptype=PTYPE_CSV;
350 static inline int consume_chunk(gs_sp_t chunk, gs_uint32_t chunk_size) {
351 int tuple_consumed = 0;
352 gs_sp_t linepos = chunk;
353 gs_sp_t new_linepos = (gs_sp_t)memchr(linepos + leftover, '\n', chunk_size);
354 gs_sp_t end_pos = chunk + chunk_size + leftover;
355 leftover = chunk_size;
357 while (new_linepos) {
358 // *new_linepos = 0; // terminate the line
359 csv_parse_line(linepos, new_linepos - linepos);
360 rts_fta_process_packet(&cur_packet);
362 linepos = new_linepos + 1;
363 leftover = end_pos - linepos;
364 new_linepos = (gs_sp_t)memchr(linepos, '\n', leftover);
366 memcpy(chunk, linepos, leftover);
368 return tuple_consumed;
371 static int csv_process_chunk(gs_uint32_t chunk_size)
374 gs_uint32_t have = chunk_size;
375 gs_uint32_t tuple_consumed = 0;
378 strm.avail_in = have;
380 /* run inflate() on input until output buffer not full */
382 strm.avail_out = CHUNK;
383 strm.next_out = out + leftover;
384 ret = inflate(&strm, Z_NO_FLUSH);
385 /* assert(ret != Z_STREAM_ERROR); state not clobbered */
388 ret = Z_DATA_ERROR; /* and fall through */
391 (void)inflateEnd(&strm);
395 fprintf(stderr,"Error inflating data chunk\n");
398 have = CHUNK - strm.avail_out;
399 tuple_consumed += consume_chunk((gs_sp_t)out, have);
400 } while (strm.avail_out == 0);
401 /* done when inflate() says it's done */
403 if (ret == Z_STREAM_END) {
410 tuple_consumed += consume_chunk((gs_sp_t)out, have);
413 return tuple_consumed;
416 static gs_int32_t csv_read_chunk() {
421 return read_chunk_socket((gs_sp_t)out, CHUNK);
423 gs_sp_t read_pos = (gs_sp_t)(use_gzip ? in : (out + leftover));
427 if (ifh == 0) next_file_bsa();
428 if (ifh == 0) // if no new files available return
429 return -1; // -1 indicates a timeout
431 while ((have = reader->read(read_pos, CHUNK)) == 0) {
436 if (ifh == 0) { // if no new files available return
437 return -1; // -1 indicates a timeout
442 if (fd <= 0) next_file();
443 while ((have = read(fd, read_pos, CHUNK)) == 0) {
446 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
461 static gs_retval_t csv_process_input()
464 static unsigned totalcnt = 0;
467 while(cnt < 50000) { // process up to 50000 tuples at a time
468 retval = csv_read_chunk();
469 if (retval == -1) return 0; // got a timeout so service message queue
471 // we signal that everything is done
473 fprintf(stderr,"Done processing, waiting for things to shut down\n");
475 // now just service message queue until we get killed or loose connectivity
477 fta_start_service(0); // service all waiting messages
478 usleep(1000); // sleep a millisecond
481 cnt += csv_process_chunk((gs_uint32_t)retval);
483 totalcnt = totalcnt + cnt;
486 fprintf(stderr,"%s: Processed %u tuples, rate = %lf tup/sec\n", dev, totalcnt, 1000.0 * (double)totalcnt / (double)bsa_total_elapsed_time);
488 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
494 extern "C" gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
500 csv_replay_init(device);
502 /* initalize host_lib */
504 fprintf(stderr,"Init LFTAs for %s\n",device);
507 if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
508 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
513 fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
515 // set maximum field nubmer to be extracted by csv parser
516 csv_set_maxfield(max_field_csv);
518 cont = startupdelay + time(0);
520 if (verbose) { fprintf(stderr,"Start startup delay"); }
522 while (cont > time(NULL)) {
523 if (fta_start_service(0) < 0) {
524 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
527 usleep(1000); /* sleep for one millisecond */
530 if (verbose) { fprintf(stderr,"... Done\n"); }
532 // open the connection to the data source
533 if (gshub != 0) { init_socket();}
535 // wait to process till we get the signal from GSHUB
536 if (get_hub(&mygshub) != 0) {
537 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
540 while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
542 if (fta_start_service(0) < 0) {
543 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
548 /* now we enter an endless loop to process data */
550 fprintf(stderr,"Start processing %s\n",device);
555 stream = BSA::FileStream::ISubStream::construct(std::string(name));
560 st_time = time(NULL);
562 if (csv_process_input() < 0) {
563 fprintf(stderr,"%s::error:in processing records\n", device);
566 /* process all messages on the message queue*/
567 if (fta_start_service(0) < 0) {
568 fprintf(stderr,"%s::error:in processing the msg queue\n", device);