Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscpapp / appinterface.c
index c820432..961926f 100644 (file)
-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
\r
- http://www.apache.org/licenses/LICENSE-2.0\r
\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include "app.h"\r
-#include "fta.h"\r
-#include "lapp.h"\r
-#include "string.h"\r
-#include "stdio.h"\r
-#include "stdlib.h"\r
-#include "schemaparser.h"\r
-#include "gshub.h"\r
-\r
-\r
-// Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta\r
-gs_sp_t fta_names[]={0};\r
-\r
-\r
-/* HIGH LEVEL APPLICATION INTERFACE */\r
-/* ================================ */\r
-\r
-struct fta_instance {\r
-    gs_sp_t name;\r
-    FTAID ftaid;\r
-    gs_int32_t used;\r
-    gs_schemahandle_t schema;\r
-};\r
-\r
-\r
-struct fta_instance * instance_array=0;\r
-gs_int32_t instance_array_sz=0;\r
-\r
-static gs_retval_t\r
-add_fta(gs_sp_t name, FTAID ftaid,\r
-        gs_schemahandle_t schema)\r
-{\r
-    gs_int32_t x;\r
-    if ( instance_array_sz == 0) {\r
-        if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {\r
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-            return -1;\r
-        }\r
-        memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);\r
-        instance_array_sz = STARTSZ;\r
-    }\r
-    for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);\r
-    if (x == instance_array_sz) {\r
-        gs_int32_t y;\r
-        instance_array_sz = 2*instance_array_sz;\r
-        if ((instance_array =\r
-             realloc(instance_array,instance_array_sz*\r
-                     sizeof(struct fta_instance)))==0) {\r
-                 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
-                 return -1;\r
-             }\r
-        for (y=x;y<instance_array_sz;x++)\r
-            instance_array[y].used=0;\r
-    }\r
-    instance_array[x].name=strdup(name);\r
-    instance_array[x].ftaid=ftaid;\r
-    instance_array[x].schema=schema;\r
-    instance_array[x].used=1;\r
-    return 0;\r
-}\r
-\r
-static gs_retval_t\r
-rm_fta(FTAID ftaid)\r
-{\r
-    gs_int32_t x;\r
-    for (x=0;x<instance_array_sz;x++) {\r
-        if ( (instance_array[x].ftaid.ip=ftaid.ip)\r
-            && (instance_array[x].ftaid.port==ftaid.port)\r
-            && (instance_array[x].ftaid.index==ftaid.index)\r
-            && (instance_array[x].ftaid.streamid==ftaid.streamid)){\r
-            instance_array[x].used=0;\r
-        }\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-static struct fta_instance *\r
-get_fta(FTAID ftaid)\r
-{\r
-    gs_int32_t x;\r
-    for (x=0;x<instance_array_sz;x++) {\r
-        if (( instance_array[x].used!=0 )\r
-            && (instance_array[x].ftaid.ip==ftaid.ip)\r
-            && (instance_array[x].ftaid.port==ftaid.port)\r
-            && (instance_array[x].ftaid.index==ftaid.index)\r
-            && (instance_array[x].ftaid.streamid==ftaid.streamid))\r
-        {\r
-            return &instance_array[x];\r
-        }\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-\r
-gs_retval_t\r
-ftaapp_init(gs_uint32_t bufsz)\r
-{\r
-    \r
-    endpoint gshub;\r
-    FTAID myftaid;\r
-    gs_sp_t name = "app\0";\r
-    if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {\r
-        gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");\r
-        return -1;\r
-    }\r
-    if (get_hub(&gshub)!=0) {\r
-         gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");\r
-         return -1;\r
-    }\r
-    myftaid=gscpipc_getftaid();\r
-    if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {\r
-        gslog(LOG_EMERG,"ERROR:could not set_ftainstance");\r
-        return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* this should be used before exiting to make sure everything gets\r
- cleaned up\r
- */\r
-gs_retval_t\r
-ftaapp_exit()\r
-{\r
-    gs_int32_t x;\r
-    for (x=0;x<instance_array_sz;x++) {\r
-        if (instance_array[x].used!=0) {\r
-            ftaapp_remove_fta(instance_array[x].ftaid,1);\r
-        }\r
-    }\r
-    hostlib_free();\r
-    return 0;\r
-}\r
-\r
-/* adds an FTA by key returns unique streamid which can be used to reference FTA*/\r
-\r
-\r
-FTAID\r
-ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,\r
-               gs_int32_t command, gs_int32_t sz, void *  data)\r
-{\r
-    gs_int8_t schemabuf[MAXSCHEMASZ];\r
-    gs_schemahandle_t schema;\r
-    FTAID f;\r
-    FTAID ferr;\r
-    ferr.ip=0;\r
-    ferr.port=0;\r
-    ferr.index=0;\r
-    ferr.streamid=0;\r
-    \r
-    \r
-    if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");\r
-        return ferr;\r
-    }\r
-    \r
-    \r
-    if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");\r
-        fprintf(stderr,"/n%s\n",schemabuf);\r
-        return ferr;\r
-    }\r
-    \r
-    if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,\r
-                            command,sz,data))!=0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
-        ftaschema_free(schema);\r
-        return ferr;\r
-    }\r
-    \r
-    if (f.streamid==0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
-        ftaschema_free(schema);\r
-        return ferr;\r
-    }\r
-    \r
-    //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);\r
-    if (add_fta((gs_sp_t)name,f,schema)<0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");\r
-        fta_free_instance(gscpipc_getftaid(),f,1);\r
-        ftaschema_free(schema);\r
-        return ferr;\r
-    }\r
-    \r
-    return f;\r
-}\r
-\r
-FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,\r
-                           gs_int32_t command, gs_int32_t sz,\r
-                           void *  data,gs_sp_t path,\r
-                           gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,\r
-                           gs_uint32_t delta, gs_uint32_t split) {\r
-    gs_int8_t schemabuf[MAXSCHEMASZ];\r
-    gs_schemahandle_t schema;\r
-    FTAID f;\r
-    FTAID ferr;\r
-    ferr.ip=0;\r
-    ferr.port=0;\r
-    ferr.index=0;\r
-    ferr.streamid=0;\r
-    \r
-    \r
-    if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");\r
-        return ferr;\r
-    }\r
-    \r
-    if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");\r
-        return ferr;\r
-    }\r
-    \r
-    if ((fta_alloc_print_instance(gscpipc_getftaid(),\r
-                                  &f,name,schemabuf,reusable,\r
-                                  command,sz,data,path,\r
-                                  basename,temporal_field,split_field,delta,split))!=0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
-        ftaschema_free(schema);\r
-        return ferr;\r
-    }\r
-    \r
-    if (f.streamid==0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
-        ftaschema_free(schema);\r
-        return ferr;\r
-    }\r
-    \r
-    if (add_fta((gs_sp_t)name,f,schema)<0) {\r
-        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");\r
-        fta_free_instance(gscpipc_getftaid(),f,1);\r
-        ftaschema_free(schema);\r
-        return ferr;\r
-    }\r
-    \r
-    return f;\r
-}\r
-\r
-/* get the schema definition of an FTA */\r
-gs_schemahandle_t\r
-ftaapp_get_fta_schema(FTAID ftaid)\r
-{\r
-    struct fta_instance * fi;\r
-    \r
-    //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);\r
-    \r
-    if ((fi=get_fta(ftaid))==0) {\r
-        gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");\r
-        return -1;\r
-    }\r
-    return fi->schema;\r
-}\r
-\r
-/* get the asci schema definition for the FTA associated with the FTA name */\r
-gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)\r
-{\r
-    FTAID f;\r
-    gs_int8_t schemabuf[MAXSCHEMASZ];\r
-    gs_schemahandle_t schema;\r
-    if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
-        gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");\r
-        return -1;\r
-    }\r
-    if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
-        gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");\r
-        return -1;\r
-    }\r
-    return schema;\r
-}\r
-\r
-/* get the asci schema definition for the FTA associated with the FTA name */\r
-gs_sp_t  ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)\r
-{\r
-    FTAID f;\r
-    static gs_int8_t schemabuf[MAXSCHEMASZ];\r
-    if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
-        gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");\r
-        return 0;\r
-    }\r
-    return schemabuf;\r
-}\r
-\r
-/* control operations keyed of one to one mapping of stream id */\r
-gs_retval_t\r
-ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void *  data)\r
-{\r
-    struct fta_instance * fi;\r
-    if ((fi=get_fta(f))<0) {\r
-        gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");\r
-        return -1;\r
-    }\r
-    return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);\r
-}\r
-\r
-/* remove FTA keyed of stream id */\r
-gs_retval_t\r
-ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)\r
-{\r
-    struct fta_instance * fi;\r
-    if ((fi=get_fta(f))<0) {\r
-        gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");\r
-        return -1;\r
-    }\r
-    \r
-    fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);\r
-    ftaschema_free(fi->schema);\r
-    rm_fta(f);\r
-    \r
-    return 0;\r
-}\r
-\r
-/* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove\r
- the heartbeat tuples*/\r
-gs_retval_t\r
-ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,\r
-                 gs_int32_t tbuf_size, gs_int32_t timeout)\r
-{\r
-    gs_uint64_t trace_id;\r
-    gs_uint32_t sz;\r
-    fta_stat * trace;\r
-    gs_sp_t trace_buffer;\r
-    gs_retval_t res;\r
-    \r
-get_tuple_again:\r
-    res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);\r
-    \r
-    if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {\r
-        FTAID myftaid;\r
-        myftaid=gscpipc_getftaid();\r
-        /* extract trace */\r
-        if (ftaschema_get_trace(get_fta(*ftaid)->schema,\r
-                                tbuffer, *size, &trace_id, &sz, &trace))\r
-        {\r
-            gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");\r
-            goto get_tuple_again;\r
-        }\r
-        \r
-               if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {\r
-            gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");\r
-            goto get_tuple_again;\r
-        }\r
-        \r
-        /* generate a heartbeat */\r
-        memcpy(trace_buffer, trace, sz * sizeof(fta_stat));\r
-        /* append producers fta_stat to the trace */\r
-        /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */\r
-        memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));\r
-\r
-        memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));\r
-\r
-        fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);\r
-               free(trace_buffer);\r
-        res=2; //indicate that it is a temporal tuple\r
-    }\r
-    return res;\r
-}\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "app.h"
+#include "fta.h"
+#include "lapp.h"
+#include "string.h"
+#include "stdio.h"
+#include "stdlib.h"
+#include "schemaparser.h"
+#include "gshub.h"
+
+
+// Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta
+gs_sp_t fta_names[]={0};
+
+
+/* HIGH LEVEL APPLICATION INTERFACE */
+/* ================================ */
+
+struct fta_instance {
+    gs_sp_t name;
+    FTAID ftaid;
+    gs_int32_t used;
+    gs_schemahandle_t schema;
+};
+
+
+struct fta_instance * instance_array=0;
+gs_int32_t instance_array_sz=0;
+
+static gs_retval_t
+add_fta(gs_sp_t name, FTAID ftaid,
+        gs_schemahandle_t schema)
+{
+    gs_int32_t x;
+    if ( instance_array_sz == 0) {
+        if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+            return -1;
+        }
+        memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);
+        instance_array_sz = STARTSZ;
+    }
+    for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);
+    if (x == instance_array_sz) {
+        gs_int32_t y;
+        instance_array_sz = 2*instance_array_sz;
+        if ((instance_array =
+             realloc(instance_array,instance_array_sz*
+                     sizeof(struct fta_instance)))==0) {
+                 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+                 return -1;
+             }
+        for (y=x;y<instance_array_sz;x++)
+            instance_array[y].used=0;
+    }
+    instance_array[x].name=strdup(name);
+    instance_array[x].ftaid=ftaid;
+    instance_array[x].schema=schema;
+    instance_array[x].used=1;
+    return 0;
+}
+
+static gs_retval_t
+rm_fta(FTAID ftaid)
+{
+    gs_int32_t x;
+    for (x=0;x<instance_array_sz;x++) {
+        if ( (instance_array[x].ftaid.ip=ftaid.ip)
+            && (instance_array[x].ftaid.port==ftaid.port)
+            && (instance_array[x].ftaid.index==ftaid.index)
+            && (instance_array[x].ftaid.streamid==ftaid.streamid)){
+            instance_array[x].used=0;
+        }
+    }
+    return 0;
+}
+
+
+static struct fta_instance *
+get_fta(FTAID ftaid)
+{
+    gs_int32_t x;
+    for (x=0;x<instance_array_sz;x++) {
+        if (( instance_array[x].used!=0 )
+            && (instance_array[x].ftaid.ip==ftaid.ip)
+            && (instance_array[x].ftaid.port==ftaid.port)
+            && (instance_array[x].ftaid.index==ftaid.index)
+            && (instance_array[x].ftaid.streamid==ftaid.streamid))
+        {
+            return &instance_array[x];
+        }
+    }
+    return 0;
+}
+
+
+
+gs_retval_t
+ftaapp_init(gs_uint32_t bufsz)
+{
+    
+    endpoint gshub;
+    FTAID myftaid;
+    gs_sp_t name = "app\0";
+    if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {
+        gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");
+        return -1;
+    }
+    if (get_hub(&gshub)!=0) {
+         gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");
+         return -1;
+    }
+    myftaid=gscpipc_getftaid();
+    if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {
+        gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
+        return -1;
+    }
+    return 0;
+}
+
+/* this should be used before exiting to make sure everything gets
+ cleaned up
+ */
+gs_retval_t
+ftaapp_exit()
+{
+    gs_int32_t x;
+    for (x=0;x<instance_array_sz;x++) {
+        if (instance_array[x].used!=0) {
+            ftaapp_remove_fta(instance_array[x].ftaid,1);
+        }
+    }
+    hostlib_free();
+    return 0;
+}
+
+/* adds an FTA by key returns unique streamid which can be used to reference FTA*/
+
+
+FTAID
+ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
+               gs_int32_t command, gs_int32_t sz, void *  data)
+{
+    gs_int8_t schemabuf[MAXSCHEMASZ];
+    gs_schemahandle_t schema;
+    FTAID f;
+    FTAID ferr;
+    ferr.ip=0;
+    ferr.port=0;
+    ferr.index=0;
+    ferr.streamid=0;
+    
+    
+    if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
+        return ferr;
+    }
+    
+    
+    if ((schema=ftaschema_parse_string(schemabuf))<0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
+        fprintf(stderr,"/n%s\n",schemabuf);
+        return ferr;
+    }
+    
+    if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,
+                            command,sz,data))!=0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+        ftaschema_free(schema);
+        return ferr;
+    }
+    
+    if (f.streamid==0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+        ftaschema_free(schema);
+        return ferr;
+    }
+    
+    //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);
+    if (add_fta((gs_sp_t)name,f,schema)<0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
+        fta_free_instance(gscpipc_getftaid(),f,1);
+        ftaschema_free(schema);
+        return ferr;
+    }
+    
+    return f;
+}
+
+FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
+                           gs_int32_t command, gs_int32_t sz,
+                           void *  data,gs_sp_t path,
+                           gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,
+                           gs_uint32_t delta, gs_uint32_t split) {
+    gs_int8_t schemabuf[MAXSCHEMASZ];
+    gs_schemahandle_t schema;
+    FTAID f;
+    FTAID ferr;
+    ferr.ip=0;
+    ferr.port=0;
+    ferr.index=0;
+    ferr.streamid=0;
+    
+    
+    if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
+        return ferr;
+    }
+    
+    if ((schema=ftaschema_parse_string(schemabuf))<0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
+        return ferr;
+    }
+    
+    if ((fta_alloc_print_instance(gscpipc_getftaid(),
+                                  &f,name,schemabuf,reusable,
+                                  command,sz,data,path,
+                                  basename,temporal_field,split_field,delta,split))!=0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+        ftaschema_free(schema);
+        return ferr;
+    }
+    
+    if (f.streamid==0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+        ftaschema_free(schema);
+        return ferr;
+    }
+    
+    if (add_fta((gs_sp_t)name,f,schema)<0) {
+        gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
+        fta_free_instance(gscpipc_getftaid(),f,1);
+        ftaschema_free(schema);
+        return ferr;
+    }
+    
+    return f;
+}
+
+/* get the schema definition of an FTA */
+gs_schemahandle_t
+ftaapp_get_fta_schema(FTAID ftaid)
+{
+    struct fta_instance * fi;
+    
+    //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);
+    
+    if ((fi=get_fta(ftaid))==0) {
+        gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");
+        return -1;
+    }
+    return fi->schema;
+}
+
+/* get the asci schema definition for the FTA associated with the FTA name */
+gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)
+{
+    FTAID f;
+    gs_int8_t schemabuf[MAXSCHEMASZ];
+    gs_schemahandle_t schema;
+    if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
+        gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
+        return -1;
+    }
+    if ((schema=ftaschema_parse_string(schemabuf))<0) {
+        gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");
+        return -1;
+    }
+    return schema;
+}
+
+/* get the asci schema definition for the FTA associated with the FTA name */
+gs_sp_t  ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)
+{
+    FTAID f;
+    static gs_int8_t schemabuf[MAXSCHEMASZ];
+    if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
+        gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
+        return 0;
+    }
+    return schemabuf;
+}
+
+/* control operations keyed of one to one mapping of stream id */
+gs_retval_t
+ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void *  data)
+{
+    struct fta_instance * fi;
+    if ((fi=get_fta(f))<0) {
+        gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
+        return -1;
+    }
+    return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);
+}
+
+/* remove FTA keyed of stream id */
+gs_retval_t
+ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)
+{
+    struct fta_instance * fi;
+    if ((fi=get_fta(f))<0) {
+        gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
+        return -1;
+    }
+    
+    fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);
+    ftaschema_free(fi->schema);
+    rm_fta(f);
+    
+    return 0;
+}
+
+/* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove
+ the heartbeat tuples*/
+gs_retval_t
+ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,
+                 gs_int32_t tbuf_size, gs_int32_t timeout)
+{
+    gs_uint64_t trace_id;
+    gs_uint32_t sz;
+    fta_stat * trace;
+    gs_sp_t trace_buffer;
+    gs_retval_t res;
+    
+get_tuple_again:
+    res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);
+    
+    if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {
+        FTAID myftaid;
+        myftaid=gscpipc_getftaid();
+        /* extract trace */
+        if (ftaschema_get_trace(get_fta(*ftaid)->schema,
+                                tbuffer, *size, &trace_id, &sz, &trace))
+        {
+            gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");
+            goto get_tuple_again;
+        }
+        
+               if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {
+            gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");
+            goto get_tuple_again;
+        }
+        
+        /* generate a heartbeat */
+        memcpy(trace_buffer, trace, sz * sizeof(fta_stat));
+        /* append producers fta_stat to the trace */
+        /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */
+        memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));
+
+        memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));
+
+        fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);
+               free(trace_buffer);
+        res=2; //indicate that it is a temporal tuple
+    }
+    return res;
+}