1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
38 #include "schemaparser.h"
42 void fta_init(gs_sp_t device);
43 void rts_fta_process_packet(struct packet * p);
49 #define CSVMAXLINE 1000000
52 static gs_uint8_t in[CHUNK + CSVMAXLINE];
53 static gs_uint8_t out[CHUNK + CSVMAXLINE];
55 #define FILEWAIT_TIMEOUT 10000 // timeout value for getting next file (in microseconds)
57 gs_uint32_t max_field_csv = CSVELEMENTS;
62 #include "bsa_stream.hpp"
63 #include "bsa_util.hpp"
64 BSA::FileStream::ISubStream* stream;
65 BSA::FileStream::IFileHandle* ifh;
66 BSA::FileStream::Reader* reader;
71 #include <openssl/pem.h>
72 #include <openssl/x509.h>
73 #include <openssl/x509v3.h>
74 #include <openssl/ssl.h>
75 #include <openssl/crypto.h>
76 #include <openssl/err.h>
83 // callback for passing password to private key reader
84 int pass_cb(char *buf, int size, int rwflag, void *u) {
85 int len = strlen(pwd);
86 memcpy(buf, pwd, len);
94 static int listensockfd=-1;
96 static struct packet cur_packet;
98 static gs_sp_t dir_name;
99 struct dirent **namelist;
100 static gs_int32_t num_dir_files;
103 static size_t line_len;
104 static gs_uint32_t lineend=0;
105 static gs_uint8_t csvdel = ',';
106 static gs_uint32_t verbose=0;
107 static gs_uint32_t startupdelay=0;
108 static gs_uint32_t singlefile=0;
109 static gs_uint32_t use_gzip=0;
110 static gs_uint32_t use_bsa=0;
111 static gs_uint32_t use_decryption=0;
112 static gs_uint32_t gshub=0;
113 static int socket_desc=0;
115 #include "lfta/csv_parser.h"
117 // leftover bytes not consumed at the end of the data chunk
118 gs_uint32_t leftover = 0;
120 uint64_t get_posix_clock_time ()
124 if (clock_gettime (CLOCK_MONOTONIC, &ts) == 0)
125 return (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
131 static void init_inflate() {
134 /* allocate inflate state */
135 strm.zalloc = Z_NULL;
137 strm.opaque = Z_NULL;
139 strm.next_in = Z_NULL;
140 ret = inflateInit2(&strm, 15 /* window bits */ | 32 /* use gzip */);
142 print_error((gs_sp_t)"csv::inflateInit2");
147 static void csv_replay_check_messages() {
148 if (fta_start_service(0)<0) {
149 print_error((gs_sp_t)"Error:in processing the msg queue for a replay file");
154 static gs_int32_t read_chunk_socket(gs_sp_t buffer, gs_uint32_t length){
158 struct timeval socket_timeout;
161 FD_ZERO(&socket_rset);
162 FD_SET(socket_desc,&socket_rset);
163 FD_ZERO(&socket_eset);
164 FD_SET(socket_desc,&socket_eset);
165 // timeout in one millisecond
166 socket_timeout.tv_sec = 0;
167 socket_timeout.tv_usec = 1000;
169 if ((retval = select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
177 if ((r=read(socket_desc, buffer + leftover, length)) <= 0) {
178 print_error((gs_sp_t)"ERROR:could not read data from csv stream");
185 static void init_socket() {
188 struct sockaddr_in server;
189 gs_int32_t parserversion;
190 gs_uint32_t schemalen;
191 static gs_sp_t asciischema=0;
194 if (get_hub(&gshub)!=0) {
195 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
199 if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
200 print_error((gs_sp_t)"ERROR:could not find data source for stream\n");
204 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
205 if (socket_desc == -1)
207 print_error((gs_sp_t)"ERROR:could not create socket for data stream");
210 server.sin_addr.s_addr = srcinfo.ip;
211 server.sin_family = AF_INET;
212 server.sin_port = srcinfo.port;
214 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0) {
215 print_error((gs_sp_t)"ERROR: could not open connection to data source");
220 static void next_file() {
222 static gs_uint32_t file_pos = 0;
223 static gs_uint32_t scan_finished = 0;
225 char buf[CSVMAXLINE];
230 fprintf(stderr,"Done processing, waiting for things to shut down\n");
232 // now just service message queue until we get killed or loose connectivity
234 fta_start_service(0); // service all waiting messages
235 usleep(1000); // sleep a millisecond
238 if (num_dir_files) { // we already started directory scan
240 if (file_pos < num_dir_files) {
241 sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
243 free(namelist[file_pos]);
251 num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
252 if (num_dir_files == -1) {
254 print_error((gs_sp_t)"ERROR: Unable to scan directory");
257 if (num_dir_files == 2) { // only . and . are there, empty dir
265 sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
267 free(namelist[file_pos]);
274 fprintf(stderr,"Opening %s\n",name);
276 if (singlefile == 0) {
277 while (lstat(name, &s) != 0) {
278 if (errno != ENOENT) {
279 print_error((gs_sp_t)"csv::lstat unexpected return value");
282 csv_replay_check_messages();
283 usleep(FILEWAIT_TIMEOUT);
289 if ((fd = open(name, O_RDONLY)) < 0) {
290 print_error((gs_sp_t)"csv::open failed ");
293 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
296 if (use_decryption) {
297 // free SSL resources
303 FILE *fp = fdopen(fd, "r");
304 p7 = d2i_PKCS7_fp(fp, NULL);
306 print_error((gs_sp_t)"Error reading SMIME message from file");
310 if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
311 print_error((gs_sp_t)"Error decoding PKCS7 file\n");
318 if (!dir_name && !singlefile) {
328 uint64_t bsa_file_start_time = 0;
329 uint64_t bsa_total_elapsed_time = 0;
331 static void next_file_bsa() {
334 if (bsa_file_start_time) {
335 bsa_total_elapsed_time += (get_posix_clock_time()- bsa_file_start_time);
336 bsa_file_start_time = 0;
339 ifh = stream->getNextFileHandle(FILEWAIT_TIMEOUT / 1000);
344 fprintf(stderr,"%s: Opening %s %s\n", dev, ifh->getHandle().c_str(), stream->getPositionHandle().c_str());
346 bsa_file_start_time = get_posix_clock_time();
347 reader = ifh->openFile();
354 static void close_file_bsa() {
371 static gs_retval_t csv_replay_init(gs_sp_t device)
377 gs_csp_t singlefiletmp;
378 gs_csp_t compressortmp;
380 gs_csp_t encryptedtmp;
381 gs_csp_t maxfieldtmp;
389 if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
390 if (strncmp(verbosetmp,"TRUE",4)==0) {
392 fprintf(stderr,"VERBOSE ENABLED\n");
394 fprintf(stderr,"VERBOSE DISABLED\n");
398 stringtmp=get_iface_properties(device,(gs_sp_t)"filename");
400 name = strdup(stringtmp);
404 stringtmp=get_iface_properties(device,(gs_sp_t)"directoryname");
406 dir_name = strdup(stringtmp);
410 if (!name && !dir_name) {
411 print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
415 tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
418 csv_set_delim(csvdel);
421 if ((singlefiletmp=get_iface_properties(device,(gs_sp_t)"singlefile"))!=0) {
422 if (strncmp(singlefiletmp,"TRUE",4)==0) {
425 fprintf(stderr,"SINGLEFILE ENABLED\n");
428 fprintf(stderr,"SINGLEFILE DISABLED\n");
432 if ((compressortmp=get_iface_properties(device,(gs_sp_t)"compressor"))!=0) {
433 if (strncmp(compressortmp,"GZIP",4)==0) {
436 fprintf(stderr,"USING ZLIP COMPRESSOR ENABLED\n");
438 print_error((gs_sp_t)"csv_replay_init::Unknown value for interface property \"Compressor\"");
443 if ((bsatmp=get_iface_properties(device,(gs_sp_t)"bsa"))!=0) {
444 if (strncmp(bsatmp,"TRUE",4)==0) {
446 print_error((gs_sp_t)"csv_replay_init::runtime not built with BSA support to use BSA interfaces");
452 fprintf(stderr,"USING BSA STREAMS\n");
456 if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
458 fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
460 startupdelay=atoi(delaytmp);
463 if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
464 max_field_csv=atoi(maxfieldtmp);
467 if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
469 fprintf(stderr,"CSV format using gshub\n");
473 print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
478 pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
479 pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
481 if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
482 if (strncmp(encryptedtmp,"TRUE",4)==0) {
484 print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");
489 fprintf(stderr,"CSV file is encrypted\n");
491 if (!pkey_fname || !pwd_fname) {
492 print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");
496 OpenSSL_add_all_algorithms();
497 ERR_load_crypto_strings();
499 // Read password file
500 FILE* in_fd = fopen(pwd_fname, "r");
502 fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
506 if (!fgets(pwd, CSVMAXLINE, in_fd)) {
507 fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
510 strtok(pwd, "\r\n\t ");
513 // Read the private key
514 in_fd = fopen(pkey_fname, "r");
516 fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
520 rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
522 fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
531 cur_packet.ptype=PTYPE_CSV;
535 static inline int consume_chunk(gs_sp_t chunk, gs_uint32_t chunk_size) {
536 int tuple_consumed = 0;
537 gs_sp_t linepos = chunk;
538 gs_sp_t new_linepos = (gs_sp_t)memchr(linepos + leftover, '\n', chunk_size);
539 gs_sp_t end_pos = chunk + chunk_size + leftover;
540 leftover = chunk_size;
542 while (new_linepos) {
543 // *new_linepos = 0; // terminate the line
544 csv_parse_line(linepos, new_linepos - linepos);
545 rts_fta_process_packet(&cur_packet);
547 linepos = new_linepos + 1;
548 leftover = end_pos - linepos;
549 new_linepos = (gs_sp_t)memchr(linepos, '\n', leftover);
551 memcpy(chunk, linepos, leftover);
553 return tuple_consumed;
556 static int csv_process_chunk(gs_uint32_t chunk_size)
559 gs_uint32_t have = chunk_size;
560 gs_uint32_t tuple_consumed = 0;
563 strm.avail_in = have;
565 /* run inflate() on input until output buffer not full */
567 strm.avail_out = CHUNK;
568 strm.next_out = out + leftover;
569 ret = inflate(&strm, Z_NO_FLUSH);
570 /* assert(ret != Z_STREAM_ERROR); state not clobbered */
573 ret = Z_DATA_ERROR; /* and fall through */
576 (void)inflateEnd(&strm);
580 fprintf(stderr,"Error inflating data chunk\n");
583 have = CHUNK - strm.avail_out;
584 tuple_consumed += consume_chunk((gs_sp_t)out, have);
585 } while (strm.avail_out == 0);
586 /* done when inflate() says it's done */
588 if (ret == Z_STREAM_END) {
595 tuple_consumed += consume_chunk((gs_sp_t)out, have);
598 return tuple_consumed;
601 static gs_int32_t csv_read_chunk() {
606 return read_chunk_socket((gs_sp_t)out, CHUNK);
608 gs_sp_t read_pos = (gs_sp_t)(use_gzip ? in : (out + leftover));
612 if (ifh == 0) next_file_bsa();
613 if (ifh == 0) // if no new files available return
614 return -1; // -1 indicates a timeout
616 while ((have = reader->read(read_pos, CHUNK)) == 0) {
621 if (ifh == 0) { // if no new files available return
622 return -1; // -1 indicates a timeout
627 if (fd <= 0) next_file();
630 if (use_decryption) {
632 while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
635 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
646 while ((have = read(fd, read_pos, CHUNK)) == 0) {
649 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
667 static gs_retval_t csv_process_input()
670 static unsigned totalcnt = 0;
673 while(cnt < 50000) { // process up to 50000 tuples at a time
674 retval = csv_read_chunk();
675 if (retval == -1) return 0; // got a timeout so service message queue
677 // we signal that everything is done
679 fprintf(stderr,"Done processing, waiting for things to shut down\n");
681 // now just service message queue until we get killed or loose connectivity
683 fta_start_service(0); // service all waiting messages
684 usleep(1000); // sleep a millisecond
687 cnt += csv_process_chunk((gs_uint32_t)retval);
689 totalcnt = totalcnt + cnt;
692 fprintf(stderr,"%s: Processed %u tuples, rate = %lf tup/sec\n", dev, totalcnt, 1000.0 * (double)totalcnt / (double)bsa_total_elapsed_time);
694 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
700 extern "C" gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
706 csv_replay_init(device);
708 /* initalize host_lib */
710 fprintf(stderr,"Init LFTAs for %s\n",device);
713 if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
714 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
719 fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
721 // set maximum field nubmer to be extracted by csv parser
722 csv_set_maxfield(max_field_csv);
724 cont = startupdelay + time(0);
726 if (verbose) { fprintf(stderr,"Start startup delay"); }
728 while (cont > time(NULL)) {
729 if (fta_start_service(0) < 0) {
730 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
733 usleep(1000); /* sleep for one millisecond */
736 if (verbose) { fprintf(stderr,"... Done\n"); }
738 // open the connection to the data source
739 if (gshub != 0) { init_socket();}
741 // wait to process till we get the signal from GSHUB
742 if (get_hub(&mygshub) != 0) {
743 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
746 while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
748 if (fta_start_service(0) < 0) {
749 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
754 /* now we enter an endless loop to process data */
756 fprintf(stderr,"Start processing %s\n",device);
761 stream = BSA::FileStream::ISubStream::construct(std::string(name));
766 st_time = time(NULL);
768 if (csv_process_input() < 0) {
769 fprintf(stderr,"%s::error:in processing records\n", device);
772 /* process all messages on the message queue*/
773 if (fta_start_service(0) < 0) {
774 fprintf(stderr,"%s::error:in processing the msg queue\n", device);