Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / lib / gscprts / rts_csv.cc
index 33eae61..ed7403a 100644 (file)
@@ -23,9 +23,9 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <zlib.h>
-#include "errno.h"
-#include "stdio.h"
-#include "stdlib.h"
+#include <errno.h>
+#include <stdio.h>
+#include <dirent.h>
 
 
 extern "C" {
@@ -64,6 +64,29 @@ z_stream strm;
 BSA::FileStream::ISubStream* stream;
 BSA::FileStream::IFileHandle* ifh;
 BSA::FileStream::Reader* reader;
+
+#endif
+
+#ifdef SSL_ENABLED
+#include <openssl/pem.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+#include <openssl/ssl.h>
+#include <openssl/crypto.h>
+#include <openssl/err.h>
+
+EVP_PKEY *rkey;
+PKCS7 *p7;
+BIO* mem_io;
+char pwd[CSVMAXLINE];
+
+// callback for passing password to private key reader
+int pass_cb(char *buf, int size, int rwflag, void *u) {
+    int len = strlen(pwd);
+    memcpy(buf, pwd, len);
+    return len;
+}
+
 #endif
 
 gs_sp_t dev;
@@ -72,6 +95,9 @@ static int listensockfd=-1;
 static int fd=-1;
 static struct packet cur_packet;
 static gs_sp_t name;
+static gs_sp_t dir_name;
+struct dirent **namelist;
+static gs_int32_t num_dir_files;
 static gs_sp_t line;
 static ssize_t len;
 static size_t line_len;
@@ -82,6 +108,7 @@ static gs_uint32_t startupdelay=0;
 static gs_uint32_t singlefile=0;
 static gs_uint32_t use_gzip=0;
 static gs_uint32_t use_bsa=0;
+static gs_uint32_t use_decryption=0;
 static gs_uint32_t gshub=0;
 static int socket_desc=0;
 
@@ -191,6 +218,57 @@ static void init_socket() {
 }
 
 static void next_file() {
+
+       static gs_uint32_t file_pos = 0;
+       static gs_uint32_t scan_finished = 0;
+
+       char buf[CSVMAXLINE];
+
+       if (dir_name) {
+               if (scan_finished) {
+                       if (verbose)
+                               fprintf(stderr,"Done processing, waiting for things to shut down\n");
+                       rts_fta_done();
+                       // now just service message queue until we get killed or loose connectivity
+                       while (true) {
+                               fta_start_service(0); // service all waiting messages
+                               usleep(1000); // sleep a millisecond
+                       }
+               }
+               if (num_dir_files) {            // we already started directory scan
+                       free(name);
+                       if (file_pos < num_dir_files) {
+                               sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
+                               name = strdup(buf);
+                               free(namelist[file_pos]);
+                               file_pos++;
+                       } else {
+                               free(namelist);
+                               scan_finished = 1;
+                               return;
+                       }
+               } else {
+                       num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
+                       if (num_dir_files == -1) {
+                               num_dir_files = 0;
+                               print_error((gs_sp_t)"ERROR: Unable to scan directory");
+                               return;
+                       }
+                       if (num_dir_files == 2) {       // only . and . are there, empty dir
+                               free(namelist[0]);
+                               free(namelist[1]);
+                               scan_finished = 1;
+                               return;
+                       } else
+                               file_pos = 2;
+                       
+                       sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
+                       name = strdup(buf);
+                       free(namelist[file_pos]);
+                       file_pos++;
+               }
+       }
+
        struct stat s;
        if (verbose) {
                fprintf(stderr,"Opening %s\n",name);
@@ -213,7 +291,31 @@ static void next_file() {
                exit(10);
        }
        posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
-       if (singlefile == 0) {
+
+#ifdef SSL_ENABLED
+       if (use_decryption) {
+               // free SSL resources
+               if (mem_io)
+                       BIO_free(mem_io);
+               if (p7)
+                       PKCS7_free(p7);
+
+               FILE *fp = fdopen(fd, "r");
+               p7 = d2i_PKCS7_fp(fp, NULL);
+       if (p7 == NULL) {
+               print_error((gs_sp_t)"Error reading SMIME message from file");
+               exit(-1);
+       }
+
+       if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
+               print_error((gs_sp_t)"Error decoding PKCS7 file\n");
+               exit(-1);
+       }
+
+               fclose(fp);
+       }
+#endif 
+       if (!dir_name && !singlefile) {
                unlink(name);
        }
        if (use_gzip) {
@@ -274,7 +376,13 @@ static gs_retval_t csv_replay_init(gs_sp_t device)
        gs_sp_t tempdel;
        gs_sp_t singlefiletmp;
        gs_sp_t compressortmp;
-       gs_sp_t bsatmp;    
+       gs_sp_t bsatmp;  
+       gs_sp_t encryptedtmp;  
+       gs_sp_t maxfieldtmp
+
+       gs_sp_t pkey_fname;  
+       gs_sp_t pwd_fname;              
+
 
        if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
                if (strncmp(verbosetmp,"TRUE",4)==0) {
@@ -285,10 +393,13 @@ static gs_retval_t csv_replay_init(gs_sp_t device)
                }
        }
 
-       if ((name=get_iface_properties(device,(gs_sp_t)"filename"))==0) {
-               print_error((gs_sp_t)"csv_replay_init::No CSV \"Filename\" defined");
+       name=get_iface_properties(device,(gs_sp_t)"filename");
+       dir_name=get_iface_properties(device,(gs_sp_t)"directoryname");
+       if (!name && !dir_name) {
+               print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
                exit(0);
        }
+
        tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
        if (tempdel != 0 ) {
                csvdel = tempdel[0];
@@ -332,15 +443,77 @@ static gs_retval_t csv_replay_init(gs_sp_t device)
 
        if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
                if (verbose) {
-                               fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
+                               fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
                }
-               startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
+               startupdelay=atoi(delaytmp);
        }
+
+       if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
+               max_field_csv=atoi(maxfieldtmp);
+       }       
+
        if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
                if (verbose) {
                                fprintf(stderr,"CSV format using gshub\n");
                }
                gshub=1;
+               if (!name) {
+                       print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
+                       exit(0);                        
+               }
+       }
+
+       pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
+       pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
+
+       if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
+               if (strncmp(encryptedtmp,"TRUE",4)==0) {
+                       #ifndef SSL_ENABLED
+                               print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");                
+                               exit(0);
+                       #else
+                               use_decryption=1;
+                               if (verbose) {
+                                       fprintf(stderr,"CSV file is encrypted\n");
+                               }
+                               if (!pkey_fname || !pwd_fname) {
+                                       print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");            
+                                       exit(0);
+                               }
+
+                               OpenSSL_add_all_algorithms();
+                               ERR_load_crypto_strings();
+
+                               // Read password file
+                               FILE* in_fd = fopen(pwd_fname, "r");
+                               if (!in_fd) {
+                                       fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
+                                       exit(0);        
+                               }
+
+                               if (!fgets(pwd, CSVMAXLINE, in_fd)) {
+                                       fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
+                                       exit(0);                                        
+                               }
+                               strtok(pwd, "\r\n\t ");
+                               fclose(in_fd);                  
+
+                               // Read the private key
+                               in_fd = fopen(pkey_fname, "r");
+                               if (!in_fd) {
+                                       fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
+                                       exit(0);        
+                               }
+
+                               rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
+                               if (!rkey) {
+                                       fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
+                                       exit(-1);        
+                               }
+                               
+                               fclose(in_fd);
+                       #endif
+               }
        }
 
        cur_packet.ptype=PTYPE_CSV;
@@ -439,8 +612,12 @@ static gs_int32_t csv_read_chunk() {
                }
                } else {
 #endif
-                       if (fd <= 0) next_file();
-                       while ((have = read(fd, read_pos, CHUNK)) == 0) {
+               if (fd <= 0) next_file();
+                       
+#ifdef SSL_ENABLED
+               if (use_decryption) {
+
+               while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
                                if (singlefile==1) {
                                        if(verbose) {
                                                fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
@@ -450,7 +627,24 @@ static gs_int32_t csv_read_chunk() {
                                } else {
                                        next_file();
                                }
+               }
+
+               }       else {                  
+#endif
+               while ((have = read(fd, read_pos, CHUNK)) == 0) {
+                       if (singlefile==1) {
+                               if(verbose) {
+                                       fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
+                               }
+                               return -2;
+
+                       } else {
+                               next_file();
                        }
+               }
+#ifdef SSL_ENABLED             
+               }
+#endif
 #ifdef BSA_ENABLED             
                }
 #endif