1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
23 #include "schemaparser.h"
27 // Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta
28 gs_sp_t fta_names[]={0};
31 /* HIGH LEVEL APPLICATION INTERFACE */
32 /* ================================ */
38 gs_schemahandle_t schema;
42 struct fta_instance * instance_array=0;
43 gs_int32_t instance_array_sz=0;
46 add_fta(gs_sp_t name, FTAID ftaid,
47 gs_schemahandle_t schema)
50 if ( instance_array_sz == 0) {
51 if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {
52 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
55 memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);
56 instance_array_sz = STARTSZ;
58 for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);
59 if (x == instance_array_sz) {
61 instance_array_sz = 2*instance_array_sz;
63 realloc(instance_array,instance_array_sz*
64 sizeof(struct fta_instance)))==0) {
65 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
68 for (y=x;y<instance_array_sz;x++)
69 instance_array[y].used=0;
71 instance_array[x].name=strdup(name);
72 instance_array[x].ftaid=ftaid;
73 instance_array[x].schema=schema;
74 instance_array[x].used=1;
82 for (x=0;x<instance_array_sz;x++) {
83 if ( (instance_array[x].ftaid.ip=ftaid.ip)
84 && (instance_array[x].ftaid.port==ftaid.port)
85 && (instance_array[x].ftaid.index==ftaid.index)
86 && (instance_array[x].ftaid.streamid==ftaid.streamid)){
87 instance_array[x].used=0;
94 static struct fta_instance *
98 for (x=0;x<instance_array_sz;x++) {
99 if (( instance_array[x].used!=0 )
100 && (instance_array[x].ftaid.ip==ftaid.ip)
101 && (instance_array[x].ftaid.port==ftaid.port)
102 && (instance_array[x].ftaid.index==ftaid.index)
103 && (instance_array[x].ftaid.streamid==ftaid.streamid))
105 return &instance_array[x];
114 ftaapp_init(gs_uint32_t bufsz)
119 gs_sp_t name = "app\0";
120 if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {
121 gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");
124 if (get_hub(&gshub)!=0) {
125 gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");
128 myftaid=gscpipc_getftaid();
129 if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {
130 gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
136 /* this should be used before exiting to make sure everything gets
143 for (x=0;x<instance_array_sz;x++) {
144 if (instance_array[x].used!=0) {
145 ftaapp_remove_fta(instance_array[x].ftaid,1);
152 /* adds an FTA by key returns unique streamid which can be used to reference FTA*/
156 ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
157 gs_int32_t command, gs_int32_t sz, void * data)
159 gs_int8_t schemabuf[MAXSCHEMASZ];
160 gs_schemahandle_t schema;
169 if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
170 gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
175 if ((schema=ftaschema_parse_string(schemabuf))<0) {
176 gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
177 fprintf(stderr,"/n%s\n",schemabuf);
181 if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,
182 command,sz,data))!=0) {
183 gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
184 ftaschema_free(schema);
189 gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
190 ftaschema_free(schema);
194 //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);
195 if (add_fta((gs_sp_t)name,f,schema)<0) {
196 gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
197 fta_free_instance(gscpipc_getftaid(),f,1);
198 ftaschema_free(schema);
205 FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
206 gs_int32_t command, gs_int32_t sz,
207 void * data,gs_sp_t path,
208 gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,
209 gs_uint32_t delta, gs_uint32_t split) {
210 gs_int8_t schemabuf[MAXSCHEMASZ];
211 gs_schemahandle_t schema;
220 if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
221 gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
225 if ((schema=ftaschema_parse_string(schemabuf))<0) {
226 gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
230 if ((fta_alloc_print_instance(gscpipc_getftaid(),
231 &f,name,schemabuf,reusable,
232 command,sz,data,path,
233 basename,temporal_field,split_field,delta,split))!=0) {
234 gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
235 ftaschema_free(schema);
240 gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
241 ftaschema_free(schema);
245 if (add_fta((gs_sp_t)name,f,schema)<0) {
246 gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
247 fta_free_instance(gscpipc_getftaid(),f,1);
248 ftaschema_free(schema);
255 /* get the schema definition of an FTA */
257 ftaapp_get_fta_schema(FTAID ftaid)
259 struct fta_instance * fi;
261 //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);
263 if ((fi=get_fta(ftaid))==0) {
264 gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");
270 /* get the asci schema definition for the FTA associated with the FTA name */
271 gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)
274 gs_int8_t schemabuf[MAXSCHEMASZ];
275 gs_schemahandle_t schema;
276 if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
277 gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
280 if ((schema=ftaschema_parse_string(schemabuf))<0) {
281 gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");
287 /* get the asci schema definition for the FTA associated with the FTA name */
288 gs_sp_t ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)
291 static gs_int8_t schemabuf[MAXSCHEMASZ];
292 if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
293 gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
299 /* control operations keyed of one to one mapping of stream id */
301 ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void * data)
303 struct fta_instance * fi;
304 if ((fi=get_fta(f))<0) {
305 gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
308 return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);
311 /* remove FTA keyed of stream id */
313 ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)
315 struct fta_instance * fi;
316 if ((fi=get_fta(f))<0) {
317 gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
321 fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);
322 ftaschema_free(fi->schema);
328 /* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove
329 the heartbeat tuples*/
331 ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,
332 gs_int32_t tbuf_size, gs_int32_t timeout)
334 gs_uint64_t trace_id;
337 gs_sp_t trace_buffer;
341 res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);
343 if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {
345 myftaid=gscpipc_getftaid();
347 if (ftaschema_get_trace(get_fta(*ftaid)->schema,
348 tbuffer, *size, &trace_id, &sz, &trace))
350 gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");
351 goto get_tuple_again;
354 if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {
355 gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");
356 goto get_tuple_again;
359 /* generate a heartbeat */
360 memcpy(trace_buffer, trace, sz * sizeof(fta_stat));
361 /* append producers fta_stat to the trace */
362 /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */
363 memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));
365 memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));
367 fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);
369 res=2; //indicate that it is a temporal tuple