1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
38 #include "schemaparser.h"
42 void fta_init(gs_sp_t device);
43 void rts_fta_process_packet(struct packet * p);
49 #define CSVMAXLINE 1000000
52 static gs_uint8_t in[CHUNK + CSVMAXLINE];
53 static gs_uint8_t out[CHUNK + CSVMAXLINE];
55 #define FILEWAIT_TIMEOUT 10000 // timeout value for getting next file (in microseconds)
57 gs_uint32_t max_field_csv = CSVELEMENTS;
62 #include "bsa_stream.hpp"
63 #include "bsa_util.hpp"
64 BSA::FileStream::ISubStream* stream;
65 BSA::FileStream::IFileHandle* ifh;
66 BSA::FileStream::Reader* reader;
71 #include <openssl/pem.h>
72 #include <openssl/x509.h>
73 #include <openssl/x509v3.h>
74 #include <openssl/ssl.h>
75 #include <openssl/crypto.h>
76 #include <openssl/err.h>
83 // callback for passing password to private key reader
84 int pass_cb(char *buf, int size, int rwflag, void *u) {
85 int len = strlen(pwd);
86 memcpy(buf, pwd, len);
94 static int listensockfd=-1;
96 static struct packet cur_packet;
98 static gs_sp_t dir_name;
99 struct dirent **namelist;
100 static gs_int32_t num_dir_files;
103 static size_t line_len;
104 static gs_uint32_t lineend=0;
105 static gs_uint8_t csvdel = ',';
106 static gs_uint32_t verbose=0;
107 static gs_uint32_t startupdelay=0;
108 static gs_uint32_t singlefile=0;
109 static gs_uint32_t use_gzip=0;
110 static gs_uint32_t use_bsa=0;
111 static gs_uint32_t use_decryption=0;
112 static gs_uint32_t gshub=0;
113 static int socket_desc=0;
115 #include "lfta/csv_parser.h"
117 // leftover bytes not consumed at the end of the data chunk
118 gs_uint32_t leftover = 0;
120 uint64_t get_posix_clock_time ()
124 if (clock_gettime (CLOCK_MONOTONIC, &ts) == 0)
125 return (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
131 static void init_inflate() {
134 /* allocate inflate state */
135 strm.zalloc = Z_NULL;
137 strm.opaque = Z_NULL;
139 strm.next_in = Z_NULL;
140 ret = inflateInit2(&strm, 15 /* window bits */ | 32 /* use gzip */);
142 print_error((gs_sp_t)"csv::inflateInit2");
147 static void csv_replay_check_messages() {
148 if (fta_start_service(0)<0) {
149 print_error((gs_sp_t)"Error:in processing the msg queue for a replay file");
154 static gs_int32_t read_chunk_socket(gs_sp_t buffer, gs_uint32_t length){
158 struct timeval socket_timeout;
161 FD_ZERO(&socket_rset);
162 FD_SET(socket_desc,&socket_rset);
163 FD_ZERO(&socket_eset);
164 FD_SET(socket_desc,&socket_eset);
165 // timeout in one millisecond
166 socket_timeout.tv_sec = 0;
167 socket_timeout.tv_usec = 1000;
169 if ((retval = select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
177 if ((r=read(socket_desc, buffer + leftover, length)) <= 0) {
178 print_error((gs_sp_t)"ERROR:could not read data from csv stream");
185 static void init_socket() {
188 struct sockaddr_in server;
189 gs_int32_t parserversion;
190 gs_uint32_t schemalen;
191 static gs_sp_t asciischema=0;
194 if (get_hub(&gshub)!=0) {
195 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
199 if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
200 print_error((gs_sp_t)"ERROR:could not find data source for stream\n");
204 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
205 if (socket_desc == -1)
207 print_error((gs_sp_t)"ERROR:could not create socket for data stream");
210 server.sin_addr.s_addr = srcinfo.ip;
211 server.sin_family = AF_INET;
212 server.sin_port = srcinfo.port;
214 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0) {
215 print_error((gs_sp_t)"ERROR: could not open connection to data source");
220 static void next_file() {
222 static gs_uint32_t file_pos = 0;
223 static gs_uint32_t scan_finished = 0;
225 char buf[CSVMAXLINE];
230 fprintf(stderr,"Done processing, waiting for things to shut down\n");
232 // now just service message queue until we get killed or loose connectivity
234 fta_start_service(0); // service all waiting messages
235 usleep(1000); // sleep a millisecond
238 if (num_dir_files) { // we already started directory scan
240 if (file_pos < num_dir_files) {
241 sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
243 free(namelist[file_pos]);
251 num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
252 if (num_dir_files == -1) {
254 print_error((gs_sp_t)"ERROR: Unable to scan directory");
257 if (num_dir_files == 2) { // only . and . are there, empty dir
265 sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
267 free(namelist[file_pos]);
274 fprintf(stderr,"Opening %s\n",name);
276 if (singlefile == 0) {
277 while (lstat(name, &s) != 0) {
278 if (errno != ENOENT) {
279 print_error((gs_sp_t)"csv::lstat unexpected return value");
282 csv_replay_check_messages();
283 usleep(FILEWAIT_TIMEOUT);
289 if ((fd = open(name, O_RDONLY)) < 0) {
290 print_error((gs_sp_t)"csv::open failed ");
293 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
296 if (use_decryption) {
297 // free SSL resources
303 FILE *fp = fdopen(fd, "r");
304 p7 = d2i_PKCS7_fp(fp, NULL);
306 print_error((gs_sp_t)"Error reading SMIME message from file");
310 if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
311 print_error((gs_sp_t)"Error decoding PKCS7 file\n");
318 if (!dir_name && !singlefile) {
328 uint64_t bsa_file_start_time = 0;
329 uint64_t bsa_total_elapsed_time = 0;
331 static void next_file_bsa() {
334 if (bsa_file_start_time) {
335 bsa_total_elapsed_time += (get_posix_clock_time()- bsa_file_start_time);
336 bsa_file_start_time = 0;
339 ifh = stream->getNextFileHandle(FILEWAIT_TIMEOUT / 1000);
344 fprintf(stderr,"%s: Opening %s %s\n", dev, ifh->getHandle().c_str(), stream->getPositionHandle().c_str());
346 bsa_file_start_time = get_posix_clock_time();
347 reader = ifh->openFile();
354 static void close_file_bsa() {
371 static gs_retval_t csv_replay_init(gs_sp_t device)
377 gs_sp_t singlefiletmp;
378 gs_sp_t compressortmp;
380 gs_sp_t encryptedtmp;
387 if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
388 if (strncmp(verbosetmp,"TRUE",4)==0) {
390 fprintf(stderr,"VERBOSE ENABLED\n");
392 fprintf(stderr,"VERBOSE DISABLED\n");
396 name=get_iface_properties(device,(gs_sp_t)"filename");
397 dir_name=get_iface_properties(device,(gs_sp_t)"directoryname");
398 if (!name && !dir_name) {
399 print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
403 tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
406 csv_set_delim(csvdel);
409 if ((singlefiletmp=get_iface_properties(device,(gs_sp_t)"singlefile"))!=0) {
410 if (strncmp(singlefiletmp,"TRUE",4)==0) {
413 fprintf(stderr,"SINGLEFILE ENABLED\n");
416 fprintf(stderr,"SINGLEFILE DISABLED\n");
420 if ((compressortmp=get_iface_properties(device,(gs_sp_t)"compressor"))!=0) {
421 if (strncmp(compressortmp,"GZIP",4)==0) {
424 fprintf(stderr,"USING ZLIP COMPRESSOR ENABLED\n");
426 print_error((gs_sp_t)"csv_replay_init::Unknown value for interface property \"Compressor\"");
431 if ((bsatmp=get_iface_properties(device,(gs_sp_t)"bsa"))!=0) {
432 if (strncmp(bsatmp,"TRUE",4)==0) {
434 print_error((gs_sp_t)"csv_replay_init::runtime not built with BSA support to use BSA interfaces");
440 fprintf(stderr,"USING BSA STREAMS\n");
444 if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
446 fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
448 startupdelay=atoi(delaytmp);
451 if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
452 max_field_csv=atoi(maxfieldtmp);
455 if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
457 fprintf(stderr,"CSV format using gshub\n");
461 print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
466 pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
467 pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
469 if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
470 if (strncmp(encryptedtmp,"TRUE",4)==0) {
472 print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");
477 fprintf(stderr,"CSV file is encrypted\n");
479 if (!pkey_fname || !pwd_fname) {
480 print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");
484 OpenSSL_add_all_algorithms();
485 ERR_load_crypto_strings();
487 // Read password file
488 FILE* in_fd = fopen(pwd_fname, "r");
490 fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
494 if (!fgets(pwd, CSVMAXLINE, in_fd)) {
495 fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
498 strtok(pwd, "\r\n\t ");
501 // Read the private key
502 in_fd = fopen(pkey_fname, "r");
504 fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
508 rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
510 fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
519 cur_packet.ptype=PTYPE_CSV;
523 static inline int consume_chunk(gs_sp_t chunk, gs_uint32_t chunk_size) {
524 int tuple_consumed = 0;
525 gs_sp_t linepos = chunk;
526 gs_sp_t new_linepos = (gs_sp_t)memchr(linepos + leftover, '\n', chunk_size);
527 gs_sp_t end_pos = chunk + chunk_size + leftover;
528 leftover = chunk_size;
530 while (new_linepos) {
531 // *new_linepos = 0; // terminate the line
532 csv_parse_line(linepos, new_linepos - linepos);
533 rts_fta_process_packet(&cur_packet);
535 linepos = new_linepos + 1;
536 leftover = end_pos - linepos;
537 new_linepos = (gs_sp_t)memchr(linepos, '\n', leftover);
539 memcpy(chunk, linepos, leftover);
541 return tuple_consumed;
544 static int csv_process_chunk(gs_uint32_t chunk_size)
547 gs_uint32_t have = chunk_size;
548 gs_uint32_t tuple_consumed = 0;
551 strm.avail_in = have;
553 /* run inflate() on input until output buffer not full */
555 strm.avail_out = CHUNK;
556 strm.next_out = out + leftover;
557 ret = inflate(&strm, Z_NO_FLUSH);
558 /* assert(ret != Z_STREAM_ERROR); state not clobbered */
561 ret = Z_DATA_ERROR; /* and fall through */
564 (void)inflateEnd(&strm);
568 fprintf(stderr,"Error inflating data chunk\n");
571 have = CHUNK - strm.avail_out;
572 tuple_consumed += consume_chunk((gs_sp_t)out, have);
573 } while (strm.avail_out == 0);
574 /* done when inflate() says it's done */
576 if (ret == Z_STREAM_END) {
583 tuple_consumed += consume_chunk((gs_sp_t)out, have);
586 return tuple_consumed;
589 static gs_int32_t csv_read_chunk() {
594 return read_chunk_socket((gs_sp_t)out, CHUNK);
596 gs_sp_t read_pos = (gs_sp_t)(use_gzip ? in : (out + leftover));
600 if (ifh == 0) next_file_bsa();
601 if (ifh == 0) // if no new files available return
602 return -1; // -1 indicates a timeout
604 while ((have = reader->read(read_pos, CHUNK)) == 0) {
609 if (ifh == 0) { // if no new files available return
610 return -1; // -1 indicates a timeout
615 if (fd <= 0) next_file();
618 if (use_decryption) {
620 while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
623 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
634 while ((have = read(fd, read_pos, CHUNK)) == 0) {
637 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
655 static gs_retval_t csv_process_input()
658 static unsigned totalcnt = 0;
661 while(cnt < 50000) { // process up to 50000 tuples at a time
662 retval = csv_read_chunk();
663 if (retval == -1) return 0; // got a timeout so service message queue
665 // we signal that everything is done
667 fprintf(stderr,"Done processing, waiting for things to shut down\n");
669 // now just service message queue until we get killed or loose connectivity
671 fta_start_service(0); // service all waiting messages
672 usleep(1000); // sleep a millisecond
675 cnt += csv_process_chunk((gs_uint32_t)retval);
677 totalcnt = totalcnt + cnt;
680 fprintf(stderr,"%s: Processed %u tuples, rate = %lf tup/sec\n", dev, totalcnt, 1000.0 * (double)totalcnt / (double)bsa_total_elapsed_time);
682 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
688 extern "C" gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
694 csv_replay_init(device);
696 /* initalize host_lib */
698 fprintf(stderr,"Init LFTAs for %s\n",device);
701 if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
702 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
707 fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
709 // set maximum field nubmer to be extracted by csv parser
710 csv_set_maxfield(max_field_csv);
712 cont = startupdelay + time(0);
714 if (verbose) { fprintf(stderr,"Start startup delay"); }
716 while (cont > time(NULL)) {
717 if (fta_start_service(0) < 0) {
718 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
721 usleep(1000); /* sleep for one millisecond */
724 if (verbose) { fprintf(stderr,"... Done\n"); }
726 // open the connection to the data source
727 if (gshub != 0) { init_socket();}
729 // wait to process till we get the signal from GSHUB
730 if (get_hub(&mygshub) != 0) {
731 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
734 while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
736 if (fta_start_service(0) < 0) {
737 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
742 /* now we enter an endless loop to process data */
744 fprintf(stderr,"Start processing %s\n",device);
749 stream = BSA::FileStream::ISubStream::construct(std::string(name));
754 st_time = time(NULL);
756 if (csv_process_input() < 0) {
757 fprintf(stderr,"%s::error:in processing records\n", device);
760 /* process all messages on the message queue*/
761 if (fta_start_service(0) < 0) {
762 fprintf(stderr,"%s::error:in processing the msg queue\n", device);