Refactor csv input processing. Add support for kafka interfaces. Fix bug in join...
[com/gs-lite.git] / include / lfta / schema_prototypes.h
index c50e508..1b69610 100644 (file)
@@ -27,6 +27,9 @@
 #include "md_stdlib.h"
 #include "schemaparser.h"
 
+// parser sanity checks
+// #define PARSER_SANITY_CHECKS
+
 
 // *** SAMPLING RELATED CONSTANTS
 // ******************************
@@ -66,96 +69,9 @@ static inline gs_retval_t get_schemaId(struct packet * p, gs_uint32_t * t)
        return 0;
 }
 
-/* CSV access function using position as 3rd argument */
-
-static inline gs_retval_t get_csv_uint(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
-{
-       if (p->ptype != PTYPE_CSV) return -1;
-       if (p->record.csv.numberfields < pos) return -1;
-    *t = strtoul((const char*)p->record.csv.fields[pos-1], NULL, 10);
-       return 0;
-}
-
-static inline gs_retval_t get_csv_ullong(struct packet * p, gs_uint64_t * t,gs_uint32_t pos)
-{
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-    *t = strtoull((const char*)p->record.csv.fields[pos-1], NULL, 10);    
-    return 0;
-}
-
-static inline gs_retval_t get_csv_ip(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
-{
-       unsigned ip1,ip2,ip3,ip4;
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-    sscanf((const char*) p->record.csv.fields[pos-1],"%u.%u.%u.%u",&ip1,&ip2,&ip3,&ip4);
-       *t=(ip1<<24)|(ip2<<16)|(ip3<<8)|ip4;
-    return 0;
-}
-static inline gs_retval_t get_csv_ipv6(struct packet * p, struct ipv6_str * t,gs_uint32_t pos)
-{
-    gs_uint32_t v[8];
-       if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-    sscanf((const char*) p->record.csv.fields[pos-1],"%x:%x:%x:%x:%x:%x:%x:%x",&v[0],&v[1],&v[2],&v[3],&v[4],&v[5],&v[6],&v[7]);
-       t->v[0]=htonl(v[0]<<16|v[1]);
-       t->v[1]=htonl(v[2]<<16|v[3]);
-       t->v[2]=htonl(v[4]<<16|v[5]);
-       t->v[3]=htonl(v[6]<<16|v[7]);
-       
-    return 0;
-}
-static inline gs_retval_t get_csv_string(struct packet * p, struct gs_string * t,gs_uint32_t pos)
-{
-    
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-       t->data=(gs_sp_t)p->record.csv.fields[pos-1];
-    if (pos == p->record.csv.numberfields)
-        t->length=strlen((const char*)p->record.csv.fields[pos-1]);
-    else
-        t->length=p->record.csv.fields[pos] - p->record.csv.fields[pos-1] - 1;
-       t->owner=0;
-    return 0;
-}
-static inline gs_retval_t get_csv_bool(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
-{
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-       *t=0;
-       if ((strlen((const char*)p->record.csv.fields[pos-1])==4) &&
-               (strncasecmp("TRUE",(const char*)p->record.csv.fields[pos-1],4) ==0) ) {
-               *t=1;
-       }
-    return 0;
-}
-static inline gs_retval_t get_csv_int(struct packet * p, gs_int32_t * t,gs_uint32_t pos)
-{
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-    *t = strtol((const char*)p->record.csv.fields[pos-1], NULL, 10);
-    return 0;
-}
-static inline gs_retval_t get_csv_llong(struct packet * p, gs_int64_t * t,gs_uint32_t pos)
-{
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-    *t = strtoll((const char*)p->record.csv.fields[pos-1], NULL, 10);
-    return 0;
-}
-static inline gs_retval_t get_csv_float(struct packet * p, gs_float_t * t,gs_uint32_t pos)
-{
-    if (p->ptype != PTYPE_CSV) return -1;
-    if (p->record.csv.numberfields < pos) return -1;
-    *t = strtod((const char*)p->record.csv.fields[pos-1], NULL);
-    return 0;
-}
-
-#include <lfta/csv_macro.h>
-
-static inline __attribute__((always_inline)) unsigned int gs_strtoul (const char *str, size_t len) {
-        unsigned int value = 0;
+// fast unsigned integer parsing functions
+static inline __attribute__((always_inline)) unsigned long gs_strtoul (const char *str, size_t len) {
+        unsigned long value = 0;
 
         switch (len) { // handle up to 10 digits, assume we're 32-bit
             case 10:    value += (str[len-10] - '0') * 1000000000;
@@ -204,32 +120,91 @@ static inline __attribute__((always_inline)) unsigned long long gs_strtoull (con
         }
 }
 
-static inline gs_retval_t get_csv2_uint(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
+static inline __attribute__((always_inline)) long gs_strtol (const char *str, size_t len) {
+    long sign = 1;
+    if (str[0] == '-') { 
+        sign = -1;
+        ++str;
+        --len;
+    }
+    return sign * gs_strtoul(str, len);
+}
+
+static inline __attribute__((always_inline)) long gs_strtoll (const char *str, size_t len) {
+    long long sign = 1;
+    if (str[0] == '-') { 
+        sign = -1;
+        ++str;
+        --len;
+    }
+    return sign * gs_strtoull(str, len);
+}
+
+/* CSV access function using position as 3rd argument */
+
+static inline gs_retval_t get_csv_uint(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
 {
-//    *t = strtoul((const char*)p->record.csv2.fields[pos-1], NULL, 10);
-    *t = gs_strtoul((const char*)p->record.csv2.fields[pos-1], p->record.csv2.field_lens[pos-1]);
-    
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif
+//    *t = strtoul((const char*)p->record.csv.fields[pos-1], NULL, 10);
+    *t = gs_strtoul((const char*)p->record.csv.fields[pos-1], p->record.csv.field_lens[pos-1]);
        return 0;
 }
 
-static inline gs_retval_t get_csv2_ullong(struct packet * p, gs_uint64_t * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_ullong(struct packet * p, gs_uint64_t * t,gs_uint32_t pos)
 {
-//    *t = strtoull((const char*)p->record.csv2.fields[pos-1], NULL, 10);    
-    *t = gs_strtoull((const char*)p->record.csv2.fields[pos-1], p->record.csv2.field_lens[pos-1]);
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif
+//    *t = strtoull((const char*)p->record.csv.fields[pos-1], NULL, 10);
+    *t = gs_strtoull((const char*)p->record.csv.fields[pos-1], p->record.csv.field_lens[pos-1]);
     return 0;
 }
 
-static inline gs_retval_t get_csv2_ip(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_ip(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
 {
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif
+    // parsed data is not NULL temrinated, we can terminate it to be able to use standard C functions
+    // exception is the last field than needs to be copied
+    gs_int8_t buffer[256];
+    gs_sp_t data=(gs_sp_t)p->record.csv.fields[pos-1];
+    gs_uint32_t data_len = p->record.csv.field_lens[pos-1];
+    if (pos == p->record.csv.numberfields) {
+        memcpy(buffer, data, data_len);
+        data = buffer;
+    }
+    data[data_len] = '\0';
+
        unsigned ip1,ip2,ip3,ip4;
-    sscanf((const char*) p->record.csv2.fields[pos-1],"%u.%u.%u.%u",&ip1,&ip2,&ip3,&ip4);
+    sscanf((const char*)data,"%u.%u.%u.%u",&ip1,&ip2,&ip3,&ip4);
        *t=(ip1<<24)|(ip2<<16)|(ip3<<8)|ip4;
     return 0;
 }
-static inline gs_retval_t get_csv2_ipv6(struct packet * p, struct ipv6_str * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_ipv6(struct packet * p, struct ipv6_str * t,gs_uint32_t pos)
 {
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif 
+    // parsed data is not NULL temrinated, we can terminate it to be able to use standard C functions
+    // exception is the last field than needs to be copied before we can terminate it
+    gs_int8_t buffer[256];
+    gs_sp_t data=(gs_sp_t)p->record.csv.fields[pos-1];
+    gs_uint32_t data_len = p->record.csv.field_lens[pos-1];
+    if (pos == p->record.csv.numberfields) {
+        memcpy(buffer, data, data_len);
+        data = buffer;
+    }
+    data[data_len] = '\0';
+
     gs_uint32_t v[8];
-    sscanf((const char*) p->record.csv2.fields[pos-1],"%x:%x:%x:%x:%x:%x:%x:%x",&v[0],&v[1],&v[2],&v[3],&v[4],&v[5],&v[6],&v[7]);
+    sscanf((const char*)data,"%x:%x:%x:%x:%x:%x:%x:%x",&v[0],&v[1],&v[2],&v[3],&v[4],&v[5],&v[6],&v[7]);
        t->v[0]=htonl(v[0]<<16|v[1]);
        t->v[1]=htonl(v[2]<<16|v[3]);
        t->v[2]=htonl(v[4]<<16|v[5]);
@@ -237,45 +212,73 @@ static inline gs_retval_t get_csv2_ipv6(struct packet * p, struct ipv6_str * t,g
        
     return 0;
 }
-static inline gs_retval_t get_csv2_string(struct packet * p, struct gs_string * t,gs_uint32_t pos)
-{ 
-       t->data=(gs_sp_t)p->record.csv2.fields[pos-1];
-    /*
-    if (pos == p->record.csv2.numberfields)
-        t->length=strlen((const char*)p->record.csv2.fields[pos-1]);
-    else
-        t->length=p->record.csv2.fields[pos] - p->record.csv2.fields[pos-1] - 1;
-    */
-    t->length=p->record.csv2.field_lens[pos-1];
+static inline gs_retval_t get_csv_string(struct packet * p, struct gs_string * t,gs_uint32_t pos)
+{
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif 
+       t->data=(gs_sp_t)p->record.csv.fields[pos-1];
+    t->length=p->record.csv.field_lens[pos-1];
        t->owner=0;
     return 0;
 }
-static inline gs_retval_t get_csv2_bool(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_bool(struct packet * p, gs_uint32_t * t,gs_uint32_t pos)
 {
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif 
        *t=0;
-       if ((strlen((const char*)p->record.csv2.fields[pos-1])==4) &&
-               (strncasecmp("TRUE",(const char*)p->record.csv2.fields[pos-1],4) ==0) ) {
+       if ((p->record.csv.field_lens[pos-1]==4) &&
+               (strncasecmp("TRUE",(const char*)p->record.csv.fields[pos-1],4) ==0) ) {
                *t=1;
        }
     return 0;
 }
-static inline gs_retval_t get_csv2_int(struct packet * p, gs_int32_t * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_int(struct packet * p, gs_int32_t * t,gs_uint32_t pos)
 {
-    *t = strtol((const char*)p->record.csv2.fields[pos-1], NULL, 10);
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif
+    //*t = strtol((const char*)p->record.csv.fields[pos-1], NULL, 10);
+    *t = gs_strtol((const char*)p->record.csv.fields[pos-1], p->record.csv.field_lens[pos-1]);
     return 0;
 }
-static inline gs_retval_t get_csv2_llong(struct packet * p, gs_int64_t * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_llong(struct packet * p, gs_int64_t * t,gs_uint32_t pos)
 {
-    *t = strtoll((const char*)p->record.csv2.fields[pos-1], NULL, 10);
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif
+    //*t = strtoll((const char*)p->record.csv.fields[pos-1], NULL, 10);
+    *t = gs_strtoll((const char*)p->record.csv.fields[pos-1], p->record.csv.field_lens[pos-1]);    
     return 0;
 }
-static inline gs_retval_t get_csv2_float(struct packet * p, gs_float_t * t,gs_uint32_t pos)
+static inline gs_retval_t get_csv_float(struct packet * p, gs_float_t * t,gs_uint32_t pos)
 {
-    *t = strtod((const char*)p->record.csv2.fields[pos-1], NULL);
+#ifdef PARSER_SANITY_CHECKS    
+       if (p->ptype != PTYPE_CSV) return -1;
+       if (p->record.csv.numberfields < pos) return -1;
+#endif
+    // parsed data is not NULL temrinated, we can terminate it to be able to use standard C functions
+    // exception is the last field than needs to be copied before we can terminate it
+    gs_int8_t buffer[256];
+    gs_sp_t data=(gs_sp_t)p->record.csv.fields[pos-1];
+    gs_uint32_t data_len = p->record.csv.field_lens[pos-1];
+    if (pos == p->record.csv.numberfields) {
+        memcpy(buffer, data, data_len);
+        data = buffer;
+    }
+    data[data_len] = '\0';
+
+    *t = strtod((const char*)data, NULL);
     return 0;
 }
 
-#include <lfta/csv2_macro.h>
+#include <lfta/csv_macro.h>
+
 
 /* GDAT access function using position as 3rd argument */