Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscpapp / appinterface.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include "gsconfig.h"
16 #include "gstypes.h"
17 #include "app.h"
18 #include "fta.h"
19 #include "lapp.h"
20 #include "string.h"
21 #include "stdio.h"
22 #include "stdlib.h"
23 #include "schemaparser.h"
24 #include "gshub.h"
25
26
27 // Defined here to avoid link errors as this array is auto generated for the lfta and referenced in the clearinghouse library which gets linked against the hfta
28 gs_sp_t fta_names[]={0};
29
30
31 /* HIGH LEVEL APPLICATION INTERFACE */
32 /* ================================ */
33
34 struct fta_instance {
35     gs_sp_t name;
36     FTAID ftaid;
37     gs_int32_t used;
38     gs_schemahandle_t schema;
39 };
40
41
42 struct fta_instance * instance_array=0;
43 gs_int32_t instance_array_sz=0;
44
45 static gs_retval_t
46 add_fta(gs_sp_t name, FTAID ftaid,
47         gs_schemahandle_t schema)
48 {
49     gs_int32_t x;
50     if ( instance_array_sz == 0) {
51         if ((instance_array = malloc(sizeof(struct fta_instance)*STARTSZ))==0) {
52             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
53             return -1;
54         }
55         memset(instance_array,0,sizeof(struct fta_instance)*STARTSZ);
56         instance_array_sz = STARTSZ;
57     }
58     for(x=0;(x<instance_array_sz)&&(instance_array[x].used!=0);x++);
59     if (x == instance_array_sz) {
60         gs_int32_t y;
61         instance_array_sz = 2*instance_array_sz;
62         if ((instance_array =
63              realloc(instance_array,instance_array_sz*
64                      sizeof(struct fta_instance)))==0) {
65                  gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
66                  return -1;
67              }
68         for (y=x;y<instance_array_sz;x++)
69             instance_array[y].used=0;
70     }
71     instance_array[x].name=strdup(name);
72     instance_array[x].ftaid=ftaid;
73     instance_array[x].schema=schema;
74     instance_array[x].used=1;
75     return 0;
76 }
77
78 static gs_retval_t
79 rm_fta(FTAID ftaid)
80 {
81     gs_int32_t x;
82     for (x=0;x<instance_array_sz;x++) {
83         if ( (instance_array[x].ftaid.ip=ftaid.ip)
84             && (instance_array[x].ftaid.port==ftaid.port)
85             && (instance_array[x].ftaid.index==ftaid.index)
86             && (instance_array[x].ftaid.streamid==ftaid.streamid)){
87             instance_array[x].used=0;
88         }
89     }
90     return 0;
91 }
92
93
94 static struct fta_instance *
95 get_fta(FTAID ftaid)
96 {
97     gs_int32_t x;
98     for (x=0;x<instance_array_sz;x++) {
99         if (( instance_array[x].used!=0 )
100             && (instance_array[x].ftaid.ip==ftaid.ip)
101             && (instance_array[x].ftaid.port==ftaid.port)
102             && (instance_array[x].ftaid.index==ftaid.index)
103             && (instance_array[x].ftaid.streamid==ftaid.streamid))
104         {
105             return &instance_array[x];
106         }
107     }
108     return 0;
109 }
110
111
112
113 gs_retval_t
114 ftaapp_init(gs_uint32_t bufsz)
115 {
116     
117     endpoint gshub;
118     FTAID myftaid;
119     gs_sp_t name = "app\0";
120     if (hostlib_init(APP,bufsz,DEFAULTDEV,0,0)!=0) {
121         gslog(LOG_EMERG,"ftaap_init::error:could not initialize hostlib\n");
122         return -1;
123     }
124     if (get_hub(&gshub)!=0) {
125          gslog(LOG_EMERG,"ERROR:could not find gshub in appinterface init");
126          return -1;
127     }
128     myftaid=gscpipc_getftaid();
129     if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&myftaid)!=0) {
130         gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
131         return -1;
132     }
133     return 0;
134 }
135
136 /* this should be used before exiting to make sure everything gets
137  cleaned up
138  */
139 gs_retval_t
140 ftaapp_exit()
141 {
142     gs_int32_t x;
143     for (x=0;x<instance_array_sz;x++) {
144         if (instance_array[x].used!=0) {
145             ftaapp_remove_fta(instance_array[x].ftaid,1);
146         }
147     }
148     hostlib_free();
149     return 0;
150 }
151
152 /* adds an FTA by key returns unique streamid which can be used to reference FTA*/
153
154
155 FTAID
156 ftaapp_add_fta(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
157                gs_int32_t command, gs_int32_t sz, void *  data)
158 {
159     gs_int8_t schemabuf[MAXSCHEMASZ];
160     gs_schemahandle_t schema;
161     FTAID f;
162     FTAID ferr;
163     ferr.ip=0;
164     ferr.port=0;
165     ferr.index=0;
166     ferr.streamid=0;
167     
168     
169     if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
170         gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
171         return ferr;
172     }
173     
174     
175     if ((schema=ftaschema_parse_string(schemabuf))<0) {
176         gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
177         fprintf(stderr,"/n%s\n",schemabuf);
178         return ferr;
179     }
180     
181     if ((fta_alloc_instance(gscpipc_getftaid(),&f,name,schemabuf,reusable,
182                             command,sz,data))!=0) {
183         gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
184         ftaschema_free(schema);
185         return ferr;
186     }
187     
188     if (f.streamid==0) {
189         gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
190         ftaschema_free(schema);
191         return ferr;
192     }
193     
194     //gslog(LOG_EMERG,"apptrace adding fta %u %u %u %u\n",f.ip,f.port,f.index,f.streamid);
195     if (add_fta((gs_sp_t)name,f,schema)<0) {
196         gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
197         fta_free_instance(gscpipc_getftaid(),f,1);
198         ftaschema_free(schema);
199         return ferr;
200     }
201     
202     return f;
203 }
204
205 FTAID ftaapp_add_fta_print(FTAname name, gs_uint32_t reuse, gs_uint32_t reusable,
206                            gs_int32_t command, gs_int32_t sz,
207                            void *  data,gs_sp_t path,
208                            gs_sp_t basename, gs_sp_t temporal_field, gs_sp_t split_field,
209                            gs_uint32_t delta, gs_uint32_t split) {
210     gs_int8_t schemabuf[MAXSCHEMASZ];
211     gs_schemahandle_t schema;
212     FTAID f;
213     FTAID ferr;
214     ferr.ip=0;
215     ferr.port=0;
216     ferr.index=0;
217     ferr.streamid=0;
218     
219     
220     if (fta_find(name,reuse,&f,schemabuf,MAXSCHEMASZ)!=0) {
221         gslog(LOG_EMERG,"ftaapp_add_fta::error:could not find FTA\n");
222         return ferr;
223     }
224     
225     if ((schema=ftaschema_parse_string(schemabuf))<0) {
226         gslog(LOG_EMERG,"ftaapp_add_fta::error:could not parse schema\n");
227         return ferr;
228     }
229     
230     if ((fta_alloc_print_instance(gscpipc_getftaid(),
231                                   &f,name,schemabuf,reusable,
232                                   command,sz,data,path,
233                                   basename,temporal_field,split_field,delta,split))!=0) {
234         gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
235         ftaschema_free(schema);
236         return ferr;
237     }
238     
239     if (f.streamid==0) {
240         gslog(LOG_EMERG,"ftaapp_add_fta::error:could instantiate a FTA\n");
241         ftaschema_free(schema);
242         return ferr;
243     }
244     
245     if (add_fta((gs_sp_t)name,f,schema)<0) {
246         gslog(LOG_EMERG,"ftaapp_add_fta::error:could not add fta to internal db\n");
247         fta_free_instance(gscpipc_getftaid(),f,1);
248         ftaschema_free(schema);
249         return ferr;
250     }
251     
252     return f;
253 }
254
255 /* get the schema definition of an FTA */
256 gs_schemahandle_t
257 ftaapp_get_fta_schema(FTAID ftaid)
258 {
259     struct fta_instance * fi;
260     
261     //gslog(LOG_EMERG,"apptrace checking fta %u %u %u %u\n",ftaid.ip,ftaid.port,ftaid.index,ftaid.streamid);
262     
263     if ((fi=get_fta(ftaid))==0) {
264         gslog(LOG_EMERG,"ftaapp_get_fta_schema::error:unknown streamid\n");
265         return -1;
266     }
267     return fi->schema;
268 }
269
270 /* get the asci schema definition for the FTA associated with the FTA name */
271 gs_schemahandle_t ftaapp_get_fta_schema_by_name(gs_sp_t name)
272 {
273     FTAID f;
274     gs_int8_t schemabuf[MAXSCHEMASZ];
275     gs_schemahandle_t schema;
276     if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
277         gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
278         return -1;
279     }
280     if ((schema=ftaschema_parse_string(schemabuf))<0) {
281         gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not parse schema\n");
282         return -1;
283     }
284     return schema;
285 }
286
287 /* get the asci schema definition for the FTA associated with the FTA name */
288 gs_sp_t  ftaapp_get_fta_ascii_schema_by_name(gs_sp_t name)
289 {
290     FTAID f;
291     static gs_int8_t schemabuf[MAXSCHEMASZ];
292     if (fta_find(name,0,&f,schemabuf,MAXSCHEMASZ)!=0) {
293         gslog(LOG_EMERG,"ftaapp_get_fta_schema_by_name::error:could not find FTA\n");
294         return 0;
295     }
296     return schemabuf;
297 }
298
299 /* control operations keyed of one to one mapping of stream id */
300 gs_retval_t
301 ftaapp_control(FTAID f, gs_int32_t command, gs_int32_t sz, void *  data)
302 {
303     struct fta_instance * fi;
304     if ((fi=get_fta(f))<0) {
305         gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
306         return -1;
307     }
308     return fta_control(gscpipc_getftaid(),fi->ftaid,command,sz,data);
309 }
310
311 /* remove FTA keyed of stream id */
312 gs_retval_t
313 ftaapp_remove_fta(FTAID f, gs_uint32_t recursive)
314 {
315     struct fta_instance * fi;
316     if ((fi=get_fta(f))<0) {
317         gslog(LOG_EMERG,"ftaapp_control::error:unknown streamid\n");
318         return -1;
319     }
320     
321     fta_free_instance(gscpipc_getftaid(),fi->ftaid,recursive);
322     ftaschema_free(fi->schema);
323     rm_fta(f);
324     
325     return 0;
326 }
327
328 /* same as sgroup_get_buffer just repeated to have a complet ftapp interface and remove
329  the heartbeat tuples*/
330 gs_retval_t
331 ftaapp_get_tuple(FTAID * ftaid, gs_uint32_t * size, void *tbuffer,
332                  gs_int32_t tbuf_size, gs_int32_t timeout)
333 {
334     gs_uint64_t trace_id;
335     gs_uint32_t sz;
336     fta_stat * trace;
337     gs_sp_t trace_buffer;
338     gs_retval_t res;
339     
340 get_tuple_again:
341     res=gscp_get_buffer(ftaid,(gs_int32_t *)size,tbuffer,tbuf_size,timeout);
342     
343     if ((res==0) && (ftaschema_is_temporal_tuple(get_fta(*ftaid)->schema, tbuffer))) {
344         FTAID myftaid;
345         myftaid=gscpipc_getftaid();
346         /* extract trace */
347         if (ftaschema_get_trace(get_fta(*ftaid)->schema,
348                                 tbuffer, *size, &trace_id, &sz, &trace))
349         {
350             gslog(LOG_EMERG, "ftaapp_get_tuple:Error: temporal tuple with no trace\n");
351             goto get_tuple_again;
352         }
353         
354                 if ((trace_buffer=(gs_sp_t)malloc((sz+1)*sizeof(fta_stat)))==0) {
355             gslog(LOG_EMERG,"ftaapp_get_tuple::Error: allocation for trace tuple failed\n");
356             goto get_tuple_again;
357         }
358         
359         /* generate a heartbeat */
360         memcpy(trace_buffer, trace, sz * sizeof(fta_stat));
361         /* append producers fta_stat to the trace */
362         /* for now we will just fill the FTAID part with 0 of fta_stat, the rest will be cleared */
363         memset(trace_buffer + (sz * sizeof(fta_stat)), 0, sizeof(fta_stat));
364
365         memcpy(trace_buffer + (sz * sizeof(fta_stat)), &myftaid, sizeof(FTAID));
366
367         fta_heartbeat(gscpipc_getftaid(), trace_id, sz+1, (fta_stat *)trace_buffer);
368                 free(trace_buffer);
369         res=2; //indicate that it is a temporal tuple
370     }
371     return res;
372 }