/* ------------------------------------------------ Copyright 2014 AT&T Intellectual Property Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ------------------------------------------- */ #include "gsconfig.h" #include "gstypes.h" #include "app.h" #include "fta.h" #include "lapp.h" #include "string.h" #include "stdio.h" #include "stdlib.h" #include "schemaparser.h" #include "gshub.h" // Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta gs_sp_t fta_names[]={0}; /* HIGH LEVEL APPLICATION INTERFACE */ /* ================================ */ struct fta_instance { gs_sp_t name; FTAID ftaid; gs_int32_t used; gs_schemahandle_t schema; }; struct fta_instance * instance_array=0; gs_int32_t instance_array_sz=0; static gs_retval_t add_fta(gs_sp_t name, FTAID ftaid, gs_schemahandle_t schema) { gs_int32_t x; if ( instance_array_sz == 0) { if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) { gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n"); return -1; } memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ); instance_array_sz = STARTSZ; } for(x=0;(xschema; } /* get the asci schema definition for the FTA associated with the FTA name */ gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name) { FTAID f; gs_int8_t schemabuf[MAXSCHEMASZ]; gs_schemahandle_t schema; if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) { gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n"); return -1; } if ((schema=ftaschema_parse_string(schemabuf))<0) { gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n"); return -1; } return schema; } /* get the asci schema definition for the FTA associated with the FTA name */ gs_sp_t ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name) { FTAID f; static gs_int8_t schemabuf[MAXSCHEMASZ]; if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) { gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n"); return 0; } return schemabuf; } /* control operations keyed of one to one mapping of stream id */ gs_retval_t ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void * data) { struct fta_instance * fi; if ((fi=get_fta(f))<0) { gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n"); return -1; } return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data); } /* remove FTA keyed of stream id */ gs_retval_t ftaapp_remove_fta(FTAID f, gs_uint32_t recursive) { struct fta_instance * fi; if ((fi=get_fta(f))<0) { gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n"); return -1; } fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive); ftaschema_free(fi->schema); rm_fta(f); return 0; } /* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove the heartbeat tuples*/ gs_retval_t ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer, gs_int32_t tbuf_size, gs_int32_t timeout) { gs_uint64_t trace_id; gs_uint32_t sz; fta_stat * trace; gs_sp_t trace_buffer; gs_retval_t res; get_tuple_again: res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout); if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) { FTAID myftaid; myftaid=gscpipc_getftaid(); /* extract trace */ if (ftaschema_get_trace(get_fta(*ftaid)->schema, tbuffer, *size, &trace_id, &sz, &trace)) { gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n"); goto get_tuple_again; } if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) { gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n"); goto get_tuple_again; } /* generate a heartbeat */ memcpy(trace_buffer, trace, sz * sizeof(fta_stat)); /* append producers fta_stat to the trace */ /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */ memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat)); memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID)); /* disable heartbeats for now to avoid overloading clearinghouse */ /* fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer); */ free(trace_buffer); res=2; //indicate that it is a temporal tuple } return res; }