-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-#include "gsconfig.h"
-#include "gstypes.h"
-#include "app.h"
-#include "fta.h"
-#include "lapp.h"
-#include "string.h"
-#include "stdio.h"
-#include "stdlib.h"
-#include "schemaparser.h"
-#include "gshub.h"
-
-
-// Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta
-gs_sp_t fta_names[]={0};
-
-
-/* HIGH LEVEL APPLICATION INTERFACE */
-/* ================================ */
-
-struct fta_instance {
- gs_sp_t name;
- FTAID ftaid;
- gs_int32_t used;
- gs_schemahandle_t schema;
-};
-
-
-struct fta_instance * instance_array=0;
-gs_int32_t instance_array_sz=0;
-
-static gs_retval_t
-add_fta(gs_sp_t name, FTAID ftaid,
- gs_schemahandle_t schema)
-{
- gs_int32_t x;
- if ( instance_array_sz == 0) {
- if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);
- instance_array_sz = STARTSZ;
- }
- for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);
- if (x == instance_array_sz) {
- gs_int32_t y;
- instance_array_sz = 2*instance_array_sz;
- if ((instance_array =
- realloc(instance_array,instance_array_sz*
- sizeof(struct fta_instance)))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- for (y=x;y<instance_array_sz;x++)
- instance_array[y].used=0;
- }
- instance_array[x].name=strdup(name);
- instance_array[x].ftaid=ftaid;
- instance_array[x].schema=schema;
- instance_array[x].used=1;
- return 0;
-}
-
-static gs_retval_t
-rm_fta(FTAID ftaid)
-{
- gs_int32_t x;
- for (x=0;x<instance_array_sz;x++) {
- if ( (instance_array[x].ftaid.ip=ftaid.ip)
- && (instance_array[x].ftaid.port==ftaid.port)
- && (instance_array[x].ftaid.index==ftaid.index)
- && (instance_array[x].ftaid.streamid==ftaid.streamid)){
- instance_array[x].used=0;
- }
- }
- return 0;
-}
-
-
-static struct fta_instance *
-get_fta(FTAID ftaid)
-{
- gs_int32_t x;
- for (x=0;x<instance_array_sz;x++) {
- if (( instance_array[x].used!=0 )
- && (instance_array[x].ftaid.ip==ftaid.ip)
- && (instance_array[x].ftaid.port==ftaid.port)
- && (instance_array[x].ftaid.index==ftaid.index)
- && (instance_array[x].ftaid.streamid==ftaid.streamid))
- {
- return &instance_array[x];
- }
- }
- return 0;
-}
-
-
-
-gs_retval_t
-ftaapp_init(gs_uint32_t bufsz)
-{
-
- endpoint gshub;
- FTAID myftaid;
- gs_sp_t name = "app\0";
- if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {
- gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");
- return -1;
- }
- if (get_hub(&gshub)!=0) {
- gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");
- return -1;
- }
- myftaid=gscpipc_getftaid();
- if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {
- gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
- return -1;
- }
- return 0;
-}
-
-/* this should be used before exiting to make sure everything gets
- cleaned up
- */
-gs_retval_t
-ftaapp_exit()
-{
- gs_int32_t x;
- for (x=0;x<instance_array_sz;x++) {
- if (instance_array[x].used!=0) {
- ftaapp_remove_fta(instance_array[x].ftaid,1);
- }
- }
- hostlib_free();
- return 0;
-}
-
-/* adds an FTA by key returns unique streamid which can be used to reference FTA*/
-
-
-FTAID
-ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
- gs_int32_t command, gs_int32_t sz, void * data)
-{
- gs_int8_t schemabuf[MAXSCHEMASZ];
- gs_schemahandle_t schema;
- FTAID f;
- FTAID ferr;
- ferr.ip=0;
- ferr.port=0;
- ferr.index=0;
- ferr.streamid=0;
-
-
- if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
- return ferr;
- }
-
-
- if ((schema=ftaschema_parse_string(schemabuf))<0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
- fprintf(stderr,"/n%s\n",schemabuf);
- return ferr;
- }
-
- if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,
- command,sz,data))!=0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
- ftaschema_free(schema);
- return ferr;
- }
-
- if (f.streamid==0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
- ftaschema_free(schema);
- return ferr;
- }
-
- //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);
- if (add_fta((gs_sp_t)name,f,schema)<0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
- fta_free_instance(gscpipc_getftaid(),f,1);
- ftaschema_free(schema);
- return ferr;
- }
-
- return f;
-}
-
-FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
- gs_int32_t command, gs_int32_t sz,
- void * data,gs_sp_t path,
- gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,
- gs_uint32_t delta, gs_uint32_t split) {
- gs_int8_t schemabuf[MAXSCHEMASZ];
- gs_schemahandle_t schema;
- FTAID f;
- FTAID ferr;
- ferr.ip=0;
- ferr.port=0;
- ferr.index=0;
- ferr.streamid=0;
-
-
- if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
- return ferr;
- }
-
- if ((schema=ftaschema_parse_string(schemabuf))<0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
- return ferr;
- }
-
- if ((fta_alloc_print_instance(gscpipc_getftaid(),
- &f,name,schemabuf,reusable,
- command,sz,data,path,
- basename,temporal_field,split_field,delta,split))!=0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
- ftaschema_free(schema);
- return ferr;
- }
-
- if (f.streamid==0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
- ftaschema_free(schema);
- return ferr;
- }
-
- if (add_fta((gs_sp_t)name,f,schema)<0) {
- gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
- fta_free_instance(gscpipc_getftaid(),f,1);
- ftaschema_free(schema);
- return ferr;
- }
-
- return f;
-}
-
-/* get the schema definition of an FTA */
-gs_schemahandle_t
-ftaapp_get_fta_schema(FTAID ftaid)
-{
- struct fta_instance * fi;
-
- //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);
-
- if ((fi=get_fta(ftaid))==0) {
- gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");
- return -1;
- }
- return fi->schema;
-}
-
-/* get the asci schema definition for the FTA associated with the FTA name */
-gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)
-{
- FTAID f;
- gs_int8_t schemabuf[MAXSCHEMASZ];
- gs_schemahandle_t schema;
- if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
- gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
- return -1;
- }
- if ((schema=ftaschema_parse_string(schemabuf))<0) {
- gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");
- return -1;
- }
- return schema;
-}
-
-/* get the asci schema definition for the FTA associated with the FTA name */
-gs_sp_t ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)
-{
- FTAID f;
- static gs_int8_t schemabuf[MAXSCHEMASZ];
- if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
- gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
- return 0;
- }
- return schemabuf;
-}
-
-/* control operations keyed of one to one mapping of stream id */
-gs_retval_t
-ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void * data)
-{
- struct fta_instance * fi;
- if ((fi=get_fta(f))<0) {
- gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
- return -1;
- }
- return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);
-}
-
-/* remove FTA keyed of stream id */
-gs_retval_t
-ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)
-{
- struct fta_instance * fi;
- if ((fi=get_fta(f))<0) {
- gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
- return -1;
- }
-
- fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);
- ftaschema_free(fi->schema);
- rm_fta(f);
-
- return 0;
-}
-
-/* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove
- the heartbeat tuples*/
-gs_retval_t
-ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,
- gs_int32_t tbuf_size, gs_int32_t timeout)
-{
- gs_uint64_t trace_id;
- gs_uint32_t sz;
- fta_stat * trace;
- gs_sp_t trace_buffer;
- gs_retval_t res;
-
-get_tuple_again:
- res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);
-
- if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {
- FTAID myftaid;
- myftaid=gscpipc_getftaid();
- /* extract trace */
- if (ftaschema_get_trace(get_fta(*ftaid)->schema,
- tbuffer, *size, &trace_id, &sz, &trace))
- {
- gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");
- goto get_tuple_again;
- }
-
- if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {
- gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");
- goto get_tuple_again;
- }
-
- /* generate a heartbeat */
- memcpy(trace_buffer, trace, sz * sizeof(fta_stat));
- /* append producers fta_stat to the trace */
- /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */
- memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));
-
- memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));
-
- fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);
- free(trace_buffer);
- res=2; //indicate that it is a temporal tuple
- }
- return res;
-}
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+ \r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+ \r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include "app.h"\r
+#include "fta.h"\r
+#include "lapp.h"\r
+#include "string.h"\r
+#include "stdio.h"\r
+#include "stdlib.h"\r
+#include "schemaparser.h"\r
+#include "gshub.h"\r
+\r
+\r
+// Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta\r
+gs_sp_t fta_names[]={0};\r
+\r
+\r
+/* HIGH LEVEL APPLICATION INTERFACE */\r
+/* ================================ */\r
+\r
+struct fta_instance {\r
+ gs_sp_t name;\r
+ FTAID ftaid;\r
+ gs_int32_t used;\r
+ gs_schemahandle_t schema;\r
+};\r
+\r
+\r
+struct fta_instance * instance_array=0;\r
+gs_int32_t instance_array_sz=0;\r
+\r
+static gs_retval_t\r
+add_fta(gs_sp_t name, FTAID ftaid,\r
+ gs_schemahandle_t schema)\r
+{\r
+ gs_int32_t x;\r
+ if ( instance_array_sz == 0) {\r
+ if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);\r
+ instance_array_sz = STARTSZ;\r
+ }\r
+ for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);\r
+ if (x == instance_array_sz) {\r
+ gs_int32_t y;\r
+ instance_array_sz = 2*instance_array_sz;\r
+ if ((instance_array =\r
+ realloc(instance_array,instance_array_sz*\r
+ sizeof(struct fta_instance)))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ for (y=x;y<instance_array_sz;x++)\r
+ instance_array[y].used=0;\r
+ }\r
+ instance_array[x].name=strdup(name);\r
+ instance_array[x].ftaid=ftaid;\r
+ instance_array[x].schema=schema;\r
+ instance_array[x].used=1;\r
+ return 0;\r
+}\r
+\r
+static gs_retval_t\r
+rm_fta(FTAID ftaid)\r
+{\r
+ gs_int32_t x;\r
+ for (x=0;x<instance_array_sz;x++) {\r
+ if ( (instance_array[x].ftaid.ip=ftaid.ip)\r
+ && (instance_array[x].ftaid.port==ftaid.port)\r
+ && (instance_array[x].ftaid.index==ftaid.index)\r
+ && (instance_array[x].ftaid.streamid==ftaid.streamid)){\r
+ instance_array[x].used=0;\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+\r
+static struct fta_instance *\r
+get_fta(FTAID ftaid)\r
+{\r
+ gs_int32_t x;\r
+ for (x=0;x<instance_array_sz;x++) {\r
+ if (( instance_array[x].used!=0 )\r
+ && (instance_array[x].ftaid.ip==ftaid.ip)\r
+ && (instance_array[x].ftaid.port==ftaid.port)\r
+ && (instance_array[x].ftaid.index==ftaid.index)\r
+ && (instance_array[x].ftaid.streamid==ftaid.streamid))\r
+ {\r
+ return &instance_array[x];\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+\r
+\r
+gs_retval_t\r
+ftaapp_init(gs_uint32_t bufsz)\r
+{\r
+ \r
+ endpoint gshub;\r
+ FTAID myftaid;\r
+ gs_sp_t name = "app\0";\r
+ if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {\r
+ gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");\r
+ return -1;\r
+ }\r
+ if (get_hub(&gshub)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");\r
+ return -1;\r
+ }\r
+ myftaid=gscpipc_getftaid();\r
+ if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:could not set_ftainstance");\r
+ return -1;\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* this should be used before exiting to make sure everything gets\r
+ cleaned up\r
+ */\r
+gs_retval_t\r
+ftaapp_exit()\r
+{\r
+ gs_int32_t x;\r
+ for (x=0;x<instance_array_sz;x++) {\r
+ if (instance_array[x].used!=0) {\r
+ ftaapp_remove_fta(instance_array[x].ftaid,1);\r
+ }\r
+ }\r
+ hostlib_free();\r
+ return 0;\r
+}\r
+\r
+/* adds an FTA by key returns unique streamid which can be used to reference FTA*/\r
+\r
+\r
+FTAID\r
+ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,\r
+ gs_int32_t command, gs_int32_t sz, void * data)\r
+{\r
+ gs_int8_t schemabuf[MAXSCHEMASZ];\r
+ gs_schemahandle_t schema;\r
+ FTAID f;\r
+ FTAID ferr;\r
+ ferr.ip=0;\r
+ ferr.port=0;\r
+ ferr.index=0;\r
+ ferr.streamid=0;\r
+ \r
+ \r
+ if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");\r
+ return ferr;\r
+ }\r
+ \r
+ \r
+ if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");\r
+ fprintf(stderr,"/n%s\n",schemabuf);\r
+ return ferr;\r
+ }\r
+ \r
+ if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,\r
+ command,sz,data))!=0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
+ ftaschema_free(schema);\r
+ return ferr;\r
+ }\r
+ \r
+ if (f.streamid==0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
+ ftaschema_free(schema);\r
+ return ferr;\r
+ }\r
+ \r
+ //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);\r
+ if (add_fta((gs_sp_t)name,f,schema)<0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");\r
+ fta_free_instance(gscpipc_getftaid(),f,1);\r
+ ftaschema_free(schema);\r
+ return ferr;\r
+ }\r
+ \r
+ return f;\r
+}\r
+\r
+FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,\r
+ gs_int32_t command, gs_int32_t sz,\r
+ void * data,gs_sp_t path,\r
+ gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,\r
+ gs_uint32_t delta, gs_uint32_t split) {\r
+ gs_int8_t schemabuf[MAXSCHEMASZ];\r
+ gs_schemahandle_t schema;\r
+ FTAID f;\r
+ FTAID ferr;\r
+ ferr.ip=0;\r
+ ferr.port=0;\r
+ ferr.index=0;\r
+ ferr.streamid=0;\r
+ \r
+ \r
+ if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");\r
+ return ferr;\r
+ }\r
+ \r
+ if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");\r
+ return ferr;\r
+ }\r
+ \r
+ if ((fta_alloc_print_instance(gscpipc_getftaid(),\r
+ &f,name,schemabuf,reusable,\r
+ command,sz,data,path,\r
+ basename,temporal_field,split_field,delta,split))!=0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
+ ftaschema_free(schema);\r
+ return ferr;\r
+ }\r
+ \r
+ if (f.streamid==0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");\r
+ ftaschema_free(schema);\r
+ return ferr;\r
+ }\r
+ \r
+ if (add_fta((gs_sp_t)name,f,schema)<0) {\r
+ gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");\r
+ fta_free_instance(gscpipc_getftaid(),f,1);\r
+ ftaschema_free(schema);\r
+ return ferr;\r
+ }\r
+ \r
+ return f;\r
+}\r
+\r
+/* get the schema definition of an FTA */\r
+gs_schemahandle_t\r
+ftaapp_get_fta_schema(FTAID ftaid)\r
+{\r
+ struct fta_instance * fi;\r
+ \r
+ //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);\r
+ \r
+ if ((fi=get_fta(ftaid))==0) {\r
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");\r
+ return -1;\r
+ }\r
+ return fi->schema;\r
+}\r
+\r
+/* get the asci schema definition for the FTA associated with the FTA name */\r
+gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)\r
+{\r
+ FTAID f;\r
+ gs_int8_t schemabuf[MAXSCHEMASZ];\r
+ gs_schemahandle_t schema;\r
+ if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");\r
+ return -1;\r
+ }\r
+ if ((schema=ftaschema_parse_string(schemabuf))<0) {\r
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");\r
+ return -1;\r
+ }\r
+ return schema;\r
+}\r
+\r
+/* get the asci schema definition for the FTA associated with the FTA name */\r
+gs_sp_t ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)\r
+{\r
+ FTAID f;\r
+ static gs_int8_t schemabuf[MAXSCHEMASZ];\r
+ if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {\r
+ gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");\r
+ return 0;\r
+ }\r
+ return schemabuf;\r
+}\r
+\r
+/* control operations keyed of one to one mapping of stream id */\r
+gs_retval_t\r
+ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void * data)\r
+{\r
+ struct fta_instance * fi;\r
+ if ((fi=get_fta(f))<0) {\r
+ gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");\r
+ return -1;\r
+ }\r
+ return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);\r
+}\r
+\r
+/* remove FTA keyed of stream id */\r
+gs_retval_t\r
+ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)\r
+{\r
+ struct fta_instance * fi;\r
+ if ((fi=get_fta(f))<0) {\r
+ gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");\r
+ return -1;\r
+ }\r
+ \r
+ fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);\r
+ ftaschema_free(fi->schema);\r
+ rm_fta(f);\r
+ \r
+ return 0;\r
+}\r
+\r
+/* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove\r
+ the heartbeat tuples*/\r
+gs_retval_t\r
+ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,\r
+ gs_int32_t tbuf_size, gs_int32_t timeout)\r
+{\r
+ gs_uint64_t trace_id;\r
+ gs_uint32_t sz;\r
+ fta_stat * trace;\r
+ gs_sp_t trace_buffer;\r
+ gs_retval_t res;\r
+ \r
+get_tuple_again:\r
+ res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);\r
+ \r
+ if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {\r
+ FTAID myftaid;\r
+ myftaid=gscpipc_getftaid();\r
+ /* extract trace */\r
+ if (ftaschema_get_trace(get_fta(*ftaid)->schema,\r
+ tbuffer, *size, &trace_id, &sz, &trace))\r
+ {\r
+ gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");\r
+ goto get_tuple_again;\r
+ }\r
+ \r
+ if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {\r
+ gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");\r
+ goto get_tuple_again;\r
+ }\r
+ \r
+ /* generate a heartbeat */\r
+ memcpy(trace_buffer, trace, sz * sizeof(fta_stat));\r
+ /* append producers fta_stat to the trace */\r
+ /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */\r
+ memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));\r
+\r
+ memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));\r
+\r
+ fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);\r
+ free(trace_buffer);\r
+ res=2; //indicate that it is a temporal tuple\r
+ }\r
+ return res;\r
+}\r