-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
- \r
- http://www.apache.org/licenses/LICENSE-2.0\r
- \r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include "app.h"\r
-#include "fta.h"\r
-#include "lapp.h"\r
-#include "string.h"\r
-#include "stdio.h"\r
-#include "stdlib.h"\r
-#include "schemaparser.h"\r
-#include "gshub.h"\r
-\r
-\r
-// Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta\r
-gs_sp_t fta_names[]={0};\r
-\r
-\r
-/* HIGH LEVEL APPLICATION INTERFACE */\r
-/* ================================ */\r
-\r
-struct fta_instance {\r
- gs_sp_t name;\r
- FTAID ftaid;\r
- gs_int32_t used;\r
- gs_schemahandle_t schema;\r
-};\r
-\r
-\r
-struct fta_instance * instance_array=0;\r
-gs_int32_t instance_array_sz=0;\r
-\r
-static gs_retval_t\r
-add_fta(gs_sp_t name, FTAID ftaid,\r
- gs_schemahandle_t schema)\r
-{\r
- gs_int32_t x;\r
- if ( instance_array_sz == 0) {\r
- if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);\r
- instance_array_sz = STARTSZ;\r
- }\r
- for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);\r
- if (x == instance_array_sz) {\r
- gs_int32_t y;\r
- instance_array_sz = 2*instance_array_sz;\r
- if ((instance_array =\r
- realloc(instance_array,instance_array_sz*\r
- sizeof(struct fta_instance)))==0) {\r
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
- return -1;\r
- }\r
- for (y=x;y<instance_array_sz;x++)\r
- instance_array[y].used=0;\r
- }\r
- instance_array[x].name=strdup(name);\r
- instance_array[x].ftaid=ftaid;\r
- instance_array[x].schema=schema;\r
- instance_array[x].used=1;\r
- return 0;\r
-}\r
-\r
-static gs_retval_t\r
-rm_fta(FTAID ftaid)\r
-{\r
- gs_int32_t x;\r
- for (x=0;x<instance_array_sz;x++) {\r
- if ( (instance_array[x].ftaid.ip=ftaid.ip)\r
- && (instance_array[x].ftaid.port==ftaid.port)\r
- && (instance_array[x].ftaid.index==ftaid.index)\r
- && (instance_array[x].ftaid.streamid==ftaid.streamid)){\r
- instance_array[x].used=0;\r
- }\r
- }\r
- return 0;\r
-}\r
-\r
-\r
-static struct fta_instance *\r
-get_fta(FTAID ftaid)\r
-{\r
- gs_int32_t x;\r
- for (x=0;x<instance_array_sz;x++) {\r
- if (( instance_array[x].used!=0 )\r
- && (instance_array[x].ftaid.ip==ftaid.ip)\r
- && (instance_array[x].ftaid.port==ftaid.port)\r
- && (instance_array[x].ftaid.index==ftaid.index)\r
- && (instance_array[x].ftaid.streamid==ftaid.streamid))\r
- {\r
- return &instance_array[x];\r
- }\r
- }\r
- return 0;\r
-}\r
-\r
-\r
-\r
-gs_retval_t\r
-ftaapp_init(gs_uint32_t bufsz)\r
-{\r
- \r
- endpoint gshub;\r
- FTAID myftaid;\r
- gs_sp_t name = "app\0";\r
- if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {\r
- gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");\r
- return -1;\r
- }\r
- if (get_hub(&gshub)!=0) {\r
- gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");\r
- return -1;\r
- }\r
- myftaid=gscpipc_getftaid();\r
- if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {\r
- gslog(LOG_EMERG,"ERROR:could not set_ftainstance");\r
- return -1;\r
- }\r
- return 0;\r
-}\r
-\r
-/* this should be used before exiting to make sure everything gets\r
- cleaned up\r
- */\r
-gs_retval_t\r
-ftaapp_exit()\r
-{\r
- gs_int32_t x;\r
- for (x=0;x<instance_array_sz;x++) {\r
- if (instance_array[x].used!=0) {\r
- ftaapp_remove_fta(instance_array[x].ftaid,1);\r
- }\r
- }\r
- hostlib_free();\r
- return 0;\r
-}\r
-\r
-/* adds an FTA by key returns unique streamid which can be used to reference FTA*/\r
-\r
-\r
-FTAID\r
-ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,\r
- gs_int32_t command, gs_int32_t sz, void * data)\r
-{\r
- gs_int8_t schemabuf[MAXSCHEMASZ];\r
- gs_schemahandle_t schema;\r
- FTAID f;\r
- FTAID ferr;\r
- ferr.ip=0;\r
- ferr.port=0;\r
- ferr.index=0;\r
- ferr.streamid=0;\r
- \r
- \r
- if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");\r
- return ferr;\r
- }\r
- \r
- \r
- if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");\r
- fprintf(stderr,"/n%s\n",schemabuf);\r
- return ferr;\r
- }\r
- \r
- if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,\r
- command,sz,data))!=0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
- ftaschema_free(schema);\r
- return ferr;\r
- }\r
- \r
- if (f.streamid==0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
- ftaschema_free(schema);\r
- return ferr;\r
- }\r
- \r
- //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);\r
- if (add_fta((gs_sp_t)name,f,schema)<0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");\r
- fta_free_instance(gscpipc_getftaid(),f,1);\r
- ftaschema_free(schema);\r
- return ferr;\r
- }\r
- \r
- return f;\r
-}\r
-\r
-FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,\r
- gs_int32_t command, gs_int32_t sz,\r
- void * data,gs_sp_t path,\r
- gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,\r
- gs_uint32_t delta, gs_uint32_t split) {\r
- gs_int8_t schemabuf[MAXSCHEMASZ];\r
- gs_schemahandle_t schema;\r
- FTAID f;\r
- FTAID ferr;\r
- ferr.ip=0;\r
- ferr.port=0;\r
- ferr.index=0;\r
- ferr.streamid=0;\r
- \r
- \r
- if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");\r
- return ferr;\r
- }\r
- \r
- if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");\r
- return ferr;\r
- }\r
- \r
- if ((fta_alloc_print_instance(gscpipc_getftaid(),\r
- &f,name,schemabuf,reusable,\r
- command,sz,data,path,\r
- basename,temporal_field,split_field,delta,split))!=0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
- ftaschema_free(schema);\r
- return ferr;\r
- }\r
- \r
- if (f.streamid==0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
- ftaschema_free(schema);\r
- return ferr;\r
- }\r
- \r
- if (add_fta((gs_sp_t)name,f,schema)<0) {\r
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");\r
- fta_free_instance(gscpipc_getftaid(),f,1);\r
- ftaschema_free(schema);\r
- return ferr;\r
- }\r
- \r
- return f;\r
-}\r
-\r
-/* get the schema definition of an FTA */\r
-gs_schemahandle_t\r
-ftaapp_get_fta_schema(FTAID ftaid)\r
-{\r
- struct fta_instance * fi;\r
- \r
- //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);\r
- \r
- if ((fi=get_fta(ftaid))==0) {\r
- gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");\r
- return -1;\r
- }\r
- return fi->schema;\r
-}\r
-\r
-/* get the asci schema definition for the FTA associated with the FTA name */\r
-gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)\r
-{\r
- FTAID f;\r
- gs_int8_t schemabuf[MAXSCHEMASZ];\r
- gs_schemahandle_t schema;\r
- if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
- gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");\r
- return -1;\r
- }\r
- if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
- gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");\r
- return -1;\r
- }\r
- return schema;\r
-}\r
-\r
-/* get the asci schema definition for the FTA associated with the FTA name */\r
-gs_sp_t ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)\r
-{\r
- FTAID f;\r
- static gs_int8_t schemabuf[MAXSCHEMASZ];\r
- if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
- gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");\r
- return 0;\r
- }\r
- return schemabuf;\r
-}\r
-\r
-/* control operations keyed of one to one mapping of stream id */\r
-gs_retval_t\r
-ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void * data)\r
-{\r
- struct fta_instance * fi;\r
- if ((fi=get_fta(f))<0) {\r
- gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");\r
- return -1;\r
- }\r
- return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);\r
-}\r
-\r
-/* remove FTA keyed of stream id */\r
-gs_retval_t\r
-ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)\r
-{\r
- struct fta_instance * fi;\r
- if ((fi=get_fta(f))<0) {\r
- gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");\r
- return -1;\r
- }\r
- \r
- fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);\r
- ftaschema_free(fi->schema);\r
- rm_fta(f);\r
- \r
- return 0;\r
-}\r
-\r
-/* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove\r
- the heartbeat tuples*/\r
-gs_retval_t\r
-ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,\r
- gs_int32_t tbuf_size, gs_int32_t timeout)\r
-{\r
- gs_uint64_t trace_id;\r
- gs_uint32_t sz;\r
- fta_stat * trace;\r
- gs_sp_t trace_buffer;\r
- gs_retval_t res;\r
- \r
-get_tuple_again:\r
- res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);\r
- \r
- if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {\r
- FTAID myftaid;\r
- myftaid=gscpipc_getftaid();\r
- /* extract trace */\r
- if (ftaschema_get_trace(get_fta(*ftaid)->schema,\r
- tbuffer, *size, &trace_id, &sz, &trace))\r
- {\r
- gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");\r
- goto get_tuple_again;\r
- }\r
- \r
- if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {\r
- gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");\r
- goto get_tuple_again;\r
- }\r
- \r
- /* generate a heartbeat */\r
- memcpy(trace_buffer, trace, sz * sizeof(fta_stat));\r
- /* append producers fta_stat to the trace */\r
- /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */\r
- memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));\r
-\r
- memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));\r
-\r
- fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);\r
- free(trace_buffer);\r
- res=2; //indicate that it is a temporal tuple\r
- }\r
- return res;\r
-}\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "app.h"
+#include "fta.h"
+#include "lapp.h"
+#include "string.h"
+#include "stdio.h"
+#include "stdlib.h"
+#include "schemaparser.h"
+#include "gshub.h"
+
+
+// Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta
+gs_sp_t fta_names[]={0};
+
+
+/* HIGH LEVEL APPLICATION INTERFACE */
+/* ================================ */
+
+struct fta_instance {
+ gs_sp_t name;
+ FTAID ftaid;
+ gs_int32_t used;
+ gs_schemahandle_t schema;
+};
+
+
+struct fta_instance * instance_array=0;
+gs_int32_t instance_array_sz=0;
+
+static gs_retval_t
+add_fta(gs_sp_t name, FTAID ftaid,
+ gs_schemahandle_t schema)
+{
+ gs_int32_t x;
+ if ( instance_array_sz == 0) {
+ if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);
+ instance_array_sz = STARTSZ;
+ }
+ for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);
+ if (x == instance_array_sz) {
+ gs_int32_t y;
+ instance_array_sz = 2*instance_array_sz;
+ if ((instance_array =
+ realloc(instance_array,instance_array_sz*
+ sizeof(struct fta_instance)))==0) {
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
+ return -1;
+ }
+ for (y=x;y<instance_array_sz;x++)
+ instance_array[y].used=0;
+ }
+ instance_array[x].name=strdup(name);
+ instance_array[x].ftaid=ftaid;
+ instance_array[x].schema=schema;
+ instance_array[x].used=1;
+ return 0;
+}
+
+static gs_retval_t
+rm_fta(FTAID ftaid)
+{
+ gs_int32_t x;
+ for (x=0;x<instance_array_sz;x++) {
+ if ( (instance_array[x].ftaid.ip=ftaid.ip)
+ && (instance_array[x].ftaid.port==ftaid.port)
+ && (instance_array[x].ftaid.index==ftaid.index)
+ && (instance_array[x].ftaid.streamid==ftaid.streamid)){
+ instance_array[x].used=0;
+ }
+ }
+ return 0;
+}
+
+
+static struct fta_instance *
+get_fta(FTAID ftaid)
+{
+ gs_int32_t x;
+ for (x=0;x<instance_array_sz;x++) {
+ if (( instance_array[x].used!=0 )
+ && (instance_array[x].ftaid.ip==ftaid.ip)
+ && (instance_array[x].ftaid.port==ftaid.port)
+ && (instance_array[x].ftaid.index==ftaid.index)
+ && (instance_array[x].ftaid.streamid==ftaid.streamid))
+ {
+ return &instance_array[x];
+ }
+ }
+ return 0;
+}
+
+
+
+gs_retval_t
+ftaapp_init(gs_uint32_t bufsz)
+{
+
+ endpoint gshub;
+ FTAID myftaid;
+ gs_sp_t name = "app\0";
+ if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {
+ gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");
+ return -1;
+ }
+ if (get_hub(&gshub)!=0) {
+ gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");
+ return -1;
+ }
+ myftaid=gscpipc_getftaid();
+ if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {
+ gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
+ return -1;
+ }
+ return 0;
+}
+
+/* this should be used before exiting to make sure everything gets
+ cleaned up
+ */
+gs_retval_t
+ftaapp_exit()
+{
+ gs_int32_t x;
+ for (x=0;x<instance_array_sz;x++) {
+ if (instance_array[x].used!=0) {
+ ftaapp_remove_fta(instance_array[x].ftaid,1);
+ }
+ }
+ hostlib_free();
+ return 0;
+}
+
+/* adds an FTA by key returns unique streamid which can be used to reference FTA*/
+
+
+FTAID
+ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
+ gs_int32_t command, gs_int32_t sz, void * data)
+{
+ gs_int8_t schemabuf[MAXSCHEMASZ];
+ gs_schemahandle_t schema;
+ FTAID f;
+ FTAID ferr;
+ ferr.ip=0;
+ ferr.port=0;
+ ferr.index=0;
+ ferr.streamid=0;
+
+
+ if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
+ return ferr;
+ }
+
+
+ if ((schema=ftaschema_parse_string(schemabuf))<0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
+ fprintf(stderr,"/n%s\n",schemabuf);
+ return ferr;
+ }
+
+ if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,
+ command,sz,data))!=0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+ ftaschema_free(schema);
+ return ferr;
+ }
+
+ if (f.streamid==0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+ ftaschema_free(schema);
+ return ferr;
+ }
+
+ //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);
+ if (add_fta((gs_sp_t)name,f,schema)<0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
+ fta_free_instance(gscpipc_getftaid(),f,1);
+ ftaschema_free(schema);
+ return ferr;
+ }
+
+ return f;
+}
+
+FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
+ gs_int32_t command, gs_int32_t sz,
+ void * data,gs_sp_t path,
+ gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,
+ gs_uint32_t delta, gs_uint32_t split) {
+ gs_int8_t schemabuf[MAXSCHEMASZ];
+ gs_schemahandle_t schema;
+ FTAID f;
+ FTAID ferr;
+ ferr.ip=0;
+ ferr.port=0;
+ ferr.index=0;
+ ferr.streamid=0;
+
+
+ if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
+ return ferr;
+ }
+
+ if ((schema=ftaschema_parse_string(schemabuf))<0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
+ return ferr;
+ }
+
+ if ((fta_alloc_print_instance(gscpipc_getftaid(),
+ &f,name,schemabuf,reusable,
+ command,sz,data,path,
+ basename,temporal_field,split_field,delta,split))!=0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+ ftaschema_free(schema);
+ return ferr;
+ }
+
+ if (f.streamid==0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
+ ftaschema_free(schema);
+ return ferr;
+ }
+
+ if (add_fta((gs_sp_t)name,f,schema)<0) {
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
+ fta_free_instance(gscpipc_getftaid(),f,1);
+ ftaschema_free(schema);
+ return ferr;
+ }
+
+ return f;
+}
+
+/* get the schema definition of an FTA */
+gs_schemahandle_t
+ftaapp_get_fta_schema(FTAID ftaid)
+{
+ struct fta_instance * fi;
+
+ //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);
+
+ if ((fi=get_fta(ftaid))==0) {
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");
+ return -1;
+ }
+ return fi->schema;
+}
+
+/* get the asci schema definition for the FTA associated with the FTA name */
+gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)
+{
+ FTAID f;
+ gs_int8_t schemabuf[MAXSCHEMASZ];
+ gs_schemahandle_t schema;
+ if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
+ return -1;
+ }
+ if ((schema=ftaschema_parse_string(schemabuf))<0) {
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");
+ return -1;
+ }
+ return schema;
+}
+
+/* get the asci schema definition for the FTA associated with the FTA name */
+gs_sp_t ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)
+{
+ FTAID f;
+ static gs_int8_t schemabuf[MAXSCHEMASZ];
+ if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
+ return 0;
+ }
+ return schemabuf;
+}
+
+/* control operations keyed of one to one mapping of stream id */
+gs_retval_t
+ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void * data)
+{
+ struct fta_instance * fi;
+ if ((fi=get_fta(f))<0) {
+ gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
+ return -1;
+ }
+ return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);
+}
+
+/* remove FTA keyed of stream id */
+gs_retval_t
+ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)
+{
+ struct fta_instance * fi;
+ if ((fi=get_fta(f))<0) {
+ gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
+ return -1;
+ }
+
+ fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);
+ ftaschema_free(fi->schema);
+ rm_fta(f);
+
+ return 0;
+}
+
+/* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove
+ the heartbeat tuples*/
+gs_retval_t
+ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,
+ gs_int32_t tbuf_size, gs_int32_t timeout)
+{
+ gs_uint64_t trace_id;
+ gs_uint32_t sz;
+ fta_stat * trace;
+ gs_sp_t trace_buffer;
+ gs_retval_t res;
+
+get_tuple_again:
+ res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);
+
+ if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {
+ FTAID myftaid;
+ myftaid=gscpipc_getftaid();
+ /* extract trace */
+ if (ftaschema_get_trace(get_fta(*ftaid)->schema,
+ tbuffer, *size, &trace_id, &sz, &trace))
+ {
+ gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");
+ goto get_tuple_again;
+ }
+
+ if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {
+ gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");
+ goto get_tuple_again;
+ }
+
+ /* generate a heartbeat */
+ memcpy(trace_buffer, trace, sz * sizeof(fta_stat));
+ /* append producers fta_stat to the trace */
+ /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */
+ memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));
+
+ memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));
+
+ fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);
+ free(trace_buffer);
+ res=2; //indicate that it is a temporal tuple
+ }
+ return res;
+}