Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / lappinterface.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 #include <gscpipc.h>\r
16 #include <ipcencoding.h>\r
17 #include <stdlib.h>\r
18 #include <stdio.h>\r
19 #include <lapp.h>\r
20 #include <lappregistries.h>\r
21 #include <string.h>\r
22 #include <unistd.h>\r
23 #include <signal.h>\r
24 #include <sys/mman.h>\r
25 #include "rdtsc.h"\r
26 // If POLLING is defined applications poll every 100 msec instead of blocking\r
27 #define POLLING\r
28 \r
29 struct processtate curprocess = {0,0,0,255,0};\r
30 struct FTAID clearinghouseftaid = {0,0,0,0};\r
31 \r
32 /*\r
33  * sends the message passed in buf and waits for a result\r
34  * if a message returned is not a result it is put in the\r
35  * request queue. The resultsbuf has to be large enough\r
36  * for the largest result\r
37  */\r
38 gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t  msg, gs_sp_t  result)\r
39 {\r
40     struct hostcall * h = (struct hostcall *) msg;\r
41     gs_int8_t  buf[MAXMSGSZ];\r
42     FTAID from;\r
43     gs_int32_t length;\r
44     gs_int32_t lowop;\r
45 #ifdef PRINTMSG\r
46     fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "\r
47             "type %u with length %u\n",\r
48             (f.ip>>24)&0xff,\r
49             (f.ip>>16)&0xff,\r
50             (f.ip>>8)&0xff,\r
51             (f.ip)&0xff,\r
52             f.port,h->callid,h->size);\r
53 #endif\r
54     if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {\r
55         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
56         return -1;\r
57     }\r
58     h=(struct hostcall *) buf;\r
59     while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {\r
60 #ifdef PRINTMSG\r
61         fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"\r
62                 " of type %u with length %u\n",\r
63                 (from.ip>>24)&0xff,\r
64                 (from.ip>>16)&0xff,\r
65                 (from.ip>>8)&0xff,\r
66                 (from.ip)&0xff,\r
67                 from.port,\r
68                 h->callid,h->size);\r
69 #endif\r
70         if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {\r
71             h=(struct hostcall *) buf;\r
72             if (h->callid > RESULT_OPCODE_BASE) {\r
73                 memcpy(result,buf,length);\r
74                 return 0;\r
75             }\r
76             if (sidequeue_append(from,buf,length)<0) {\r
77                 gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
78                 return -1;\r
79             }\r
80         }\r
81     }\r
82     gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");\r
83     return -1;\r
84 }\r
85 \r
86 \r
87 gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])\r
88 {\r
89     FILE * f;\r
90     \r
91     if (curprocess.active != 0 ) {\r
92         return -1;\r
93     }\r
94     \r
95     switch (type) {\r
96         case CLEARINGHOUSE:\r
97             if (gscpipc_init(1) < 0) {\r
98                 gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "\r
99                       "clearinghouse process\n");\r
100                 return -1;\r
101             }\r
102             break;\r
103         case LFTA:\r
104 #ifdef __linux__\r
105             mlockall(MCL_CURRENT|MCL_FUTURE);\r
106 #endif\r
107         case APP:\r
108         case HFTA:\r
109             if (gscpipc_init(0) < 0) {\r
110                 gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "\r
111                       "non clearinghouse process\n");\r
112                 return -1;\r
113             }\r
114             break;\r
115         default:\r
116             gslog(LOG_EMERG,"ERROR:Unknown process type\n");\r
117             return -1;\r
118     }\r
119     \r
120     // if the buffersize is zero then allocating shared memory\r
121     // will fail. So only use it for the clearinghouse and LFTAs\r
122     if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {\r
123         gslog(LOG_EMERG,\r
124               "ERROR:buffersize in hostlib_init has to "\r
125               "be at least %u Bytes long\n",\r
126               4*MAXTUPLESZ);\r
127         return -1;\r
128     }\r
129     \r
130     curprocess.type=type;\r
131     curprocess.buffersize=buffersize;\r
132     curprocess.active = 1;\r
133     curprocess.deviceid=deviceid;\r
134     curprocess.mapcnt=mapcnt;\r
135     curprocess.map=map;\r
136     return 0;\r
137 }\r
138 \r
139 void hostlib_free()\r
140 {\r
141     if (curprocess.active != 1 ) {\r
142         return;\r
143     }\r
144     curprocess.active = 0;\r
145     gscpipc_free();\r
146 }\r
147 \r
148 \r
149 gs_retval_t fta_find(FTAname name, gs_uint32_t  reuse, FTAID *ftaid,\r
150                      gs_sp_t  schema, gs_int32_t schemasz)\r
151 {\r
152     gs_int8_t  rb[MAXRES];\r
153     struct fta_find_arg a;\r
154     struct ftafind_result * sr = (struct ftafind_result *)rb;\r
155     \r
156     a.h.callid = FTA_LOOKUP;\r
157     a.h.size = sizeof(struct fta_find_arg);\r
158     a.reuse=reuse;\r
159     if (strlen(name)>=(MAXFTANAME-1)) {\r
160         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
161         return -1;\r
162     }\r
163     strcpy(a.name,name);\r
164     ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
165     if (sr->h.callid != FTAFIND_RESULT) {\r
166         gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");\r
167         return -1;\r
168     }\r
169     if (sr->result >= 0) {\r
170         if (schema !=0) {\r
171             if (strlen(sr->schema) >= schemasz) {\r
172                 gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");\r
173                 return -1;\r
174             } else {\r
175                 strcpy(schema,sr->schema);\r
176             }\r
177         }\r
178         *ftaid=sr->f;\r
179     }\r
180     return sr->result;\r
181 }\r
182 \r
183 gs_retval_t fta_alloc_instance(FTAID subscriber,\r
184                                FTAID * ftaid, FTAname name, gs_sp_t schema,\r
185                                gs_uint32_t  reusable,\r
186                                gs_int32_t command, gs_int32_t sz, void *  data)\r
187 {\r
188     gs_int8_t  rb[MAXRES];\r
189     struct fta_alloc_instance_arg * a;\r
190     struct fta_result * fr = (struct fta_result *)rb;\r
191     struct ringbuf *r;\r
192     \r
193     /* make sure we have the share memory required */\r
194     if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {\r
195         gslog(LOG_EMERG,"ERROR:could not allocate shared memory"\r
196               "for FTA %s\n",name);\r
197         return -1;\r
198     }\r
199     \r
200     if (strlen(name)>=(MAXFTANAME-1)) {\r
201         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
202         return -1;\r
203     }\r
204     \r
205     if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
206         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
207         return -1;\r
208     }\r
209     \r
210     a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);\r
211     \r
212     a->h.callid = FTA_ALLOC_INSTANCE;\r
213     a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;\r
214     a->f=*ftaid;\r
215     a->subscriber=subscriber;\r
216     a->reusable=reusable;\r
217     a->command = command;\r
218     a->sz = sz;\r
219     memcpy(&a->data[0],data,sz);\r
220     strcpy(a->name,name);\r
221     strcpy(a->schema,schema);\r
222     \r
223     ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);\r
224     \r
225     if (fr->h.callid != FTA_RESULT) {\r
226         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
227         return -1;\r
228     }\r
229     \r
230     *ftaid=fr->f;\r
231     \r
232     if (fr->result==0) {\r
233         gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);\r
234         return streamregistry_add(*ftaid,r);\r
235     }\r
236     \r
237     return fr->result;\r
238 }\r
239 \r
240 gs_retval_t fta_alloc_print_instance(FTAID subscriber,\r
241                                      FTAID * ftaid,\r
242                                      FTAname name, gs_sp_t schema, gs_uint32_t  reusable,\r
243                                      gs_int32_t command, gs_int32_t sz, void *  data,\r
244                                      gs_sp_t  path,gs_sp_t  basename,\r
245                                      gs_sp_t  temporal_field, gs_sp_t  split_field,\r
246                                      gs_uint32_t  delta, gs_uint32_t  split)\r
247 {\r
248     gs_int8_t  rb[MAXRES];\r
249     struct fta_alloc_instance_arg * a;\r
250     struct fta_result * fr = (struct fta_result *)rb;\r
251     \r
252     if ((strlen(path)>=MAXPRINTSTRING-1)\r
253         || (strlen(basename)>=MAXPRINTSTRING-1)\r
254         || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {\r
255         gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"\r
256               " arguments to long\n");\r
257         return -1;\r
258     }\r
259     if (strlen(name)>=(MAXFTANAME-1)) {\r
260         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
261         return -1;\r
262     }\r
263     \r
264     if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
265         gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
266         return -1;\r
267     }\r
268     \r
269     a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);\r
270     \r
271     a->h.callid = FTA_ALLOC_PRINT_INSTANCE;\r
272     a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;\r
273     a->f=*ftaid;\r
274     a->subscriber=subscriber;\r
275     a->reusable=reusable;\r
276     a->split=split;\r
277     strcpy(a->name,name);\r
278     strcpy(a->schema,schema);\r
279     a->command = command;\r
280     a->sz = sz;\r
281     strcpy(a->path,path);\r
282     strcpy(a->basename,basename);\r
283     strcpy(a->temporal_field,temporal_field);\r
284     strcpy(a->split_field,split_field);\r
285     a->delta=delta;\r
286     memcpy(&a->data[0],data,sz);\r
287     \r
288     ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);\r
289     \r
290     if (fr->h.callid != FTA_RESULT) {\r
291         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
292         return -1;\r
293     }\r
294     \r
295     *ftaid=fr->f;\r
296     \r
297     return fr->result;\r
298 }\r
299 \r
300 gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t  recursive)\r
301 {\r
302     gs_int8_t  rb[MAXRES];\r
303     struct fta_free_instance_arg a;\r
304     struct standard_result * sr = (struct standard_result *)rb;\r
305     struct ringbuf *r;\r
306     \r
307     a.h.callid = FTA_FREE_INSTANCE;\r
308     a.h.size = sizeof(struct fta_free_instance_arg);\r
309     a.subscriber=subscriber;\r
310     a.f=ftaid;\r
311     a.recursive=recursive;\r
312     ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);\r
313     if (sr->h.callid != STANDARD_RESULT) {\r
314         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
315         return -1;\r
316     }\r
317     \r
318     /* make sure we remove the mapping*/\r
319     streamregistry_remove(ftaid);\r
320     \r
321     return sr->result;\r
322 }\r
323 \r
324 gs_retval_t fta_control(FTAID subscriber,\r
325                         FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)\r
326 {\r
327     gs_int8_t  rb[MAXRES];\r
328     struct fta_control_arg * a;\r
329     struct standard_result * sr = (struct standard_result *)rb;\r
330     \r
331     a = alloca(sizeof(struct fta_control_arg) + sz);\r
332     \r
333     a->h.callid = FTA_CONTROL;\r
334     a->h.size = sizeof(struct fta_control_arg)+ sz;\r
335     a->subscriber=subscriber;\r
336     a->f=ftaid;\r
337     a->command = command;\r
338     a->sz = sz;\r
339     memcpy(&a->data[0],value,sz);\r
340     \r
341     ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);\r
342     \r
343     if (sr->h.callid != STANDARD_RESULT) {\r
344         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
345         return -1;\r
346     }\r
347     \r
348     return sr->result;\r
349 }\r
350 \r
351 gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,\r
352                           gs_uint32_t  sz, fta_stat * trace){\r
353 #ifdef CLEARINGHOUSE_HEARTBEAT\r
354     struct fta_heartbeat_arg  * a;\r
355     a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));\r
356     a->h.callid = FTA_HEARTBEAT;\r
357     a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));\r
358     a->sender=self;\r
359     a->trace_id=trace_id;\r
360     a->sz=sz;\r
361     if (sz!=0) {\r
362         memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));\r
363     }\r
364 #ifdef PRINTMSG\r
365     fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "\r
366             "type %u with length %u\n",\r
367             (clearinghouseftaid.ip>>24)&0xff,\r
368             (clearinghouseftaid.ip>>16)&0xff,\r
369             (clearinghouseftaid.ip>>8)&0xff,\r
370             (clearinghouseftaid.ip)&0xff,\r
371             clearinghouseftaid.port,a->h.callid,a->h.size);\r
372 #endif\r
373     if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {\r
374         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
375         return -1;\r
376     }\r
377 #endif\r
378     return 0;\r
379 }\r
380 \r
381 gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){\r
382     struct fta_notify_producer_failure_arg  a;\r
383     a.h.callid = FTA_PRODUCER_FAILURE;\r
384     a.h.size = sizeof(struct fta_notify_producer_failure_arg);\r
385     a.sender=self;\r
386     a.producer=producer;\r
387 #ifdef PRINTMSG\r
388     fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "\r
389             "type %u with length %u\n",\r
390             (clearinghouseftaid.ip>>24)&0xff,\r
391             (clearinghouseftaid.ip>>16)&0xff,\r
392             (clearinghouseftaid.ip>>8)&0xff,\r
393             (clearinghouseftaid.ip)&0xff,\r
394             clearinghouseftaid.port,a.h.callid,a.h.size);\r
395 #endif\r
396     if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {\r
397         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
398         return -1;\r
399     }\r
400     return 0;\r
401 }\r
402 \r
403 gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)\r
404 {\r
405     gs_int8_t  rb[MAXRES];\r
406     struct process_control_arg * a;\r
407     struct standard_result * sr = (struct standard_result *)rb;\r
408     \r
409     \r
410     a = alloca(sizeof(struct process_control_arg) + sz);\r
411     \r
412     a->h.callid = PROCESS_CONTROL;\r
413     a->h.size = sizeof(struct process_control_arg)+ sz;\r
414     a->command = command;\r
415     a->sz = sz;\r
416     memcpy(&a->data[0],value,sz);\r
417     \r
418     ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);\r
419     \r
420     if (sr->h.callid != STANDARD_RESULT) {\r
421         gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
422         return -1;\r
423     }\r
424     \r
425     return sr->result;\r
426 }\r
427 \r
428 \r
429 static void timeouthandler ()\r
430 {\r
431     struct timeout_result a;\r
432     \r
433     a.h.callid=TIMEOUT;\r
434     a.h.size=sizeof(struct timeout_result);\r
435     if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {\r
436         gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
437     }\r
438 }\r
439 \r
440 gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,\r
441                             gs_int32_t tbuf_size, gs_int32_t timeout)\r
442 {\r
443     struct ringbuf * r;\r
444     FTAID from;\r
445     gs_int32_t length;\r
446     gs_int8_t  buf[MAXMSGSZ];\r
447     gs_int32_t lopp;\r
448     FTAID * f;\r
449     static      gs_uint64_t s1=0;\r
450     static      gs_uint64_t s2;\r
451     if (s1==0) {\r
452         s1=rdtsc();\r
453     }\r
454     s2=rdtsc();\r
455     cycles+=(s2-s1);\r
456 start:\r
457 #ifdef PRINTMSG\r
458     fprintf(stderr,"CHECK RINGBUFS\n");\r
459 #endif\r
460 #ifndef POLLING\r
461     /* use chance to cleanout message queue no reason\r
462      to keep anything else */\r
463     while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
464 #endif\r
465     \r
466     streamregistry_getactiveringbuf_reset();\r
467     while ((r=streamregistry_getactiveringbuf())>0) {\r
468 #ifdef PRINTMSG\r
469             fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"\r
470                 "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
471                 r->length);\r
472             if (UNREAD(r)) {\r
473             fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,\r
474                     (CURREAD(r)->f.ip>>24)&0xff,\r
475                     (CURREAD(r)->f.ip>>16)&0xff,\r
476                     (CURREAD(r)->f.ip>>8)&0xff,\r
477                     (CURREAD(r)->f.ip)&0xff,\r
478                     CURREAD(r)->f.port,\r
479                     CURREAD(r)->f.streamid,\r
480                     CURREAD(r)->sz);\r
481             }\r
482         \r
483 #endif\r
484         if (UNREAD(r)) {\r
485             *ftaid=(CURREAD(r)->f);\r
486             *size=CURREAD(r)->sz;\r
487             memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);\r
488             intuple++;\r
489             inbytes+=CURREAD(r)->sz;\r
490             ADVANCEREAD(r);\r
491             s1=rdtsc();\r
492             return 0;\r
493         }\r
494     }\r
495     if (timeout == -1) {\r
496         *size=0;\r
497         s1=rdtsc();\r
498         return 1;\r
499     }\r
500     if (timeout !=0) {\r
501         signal(SIGALRM, timeouthandler);\r
502         alarm(timeout);\r
503     }\r
504     \r
505 #ifndef POLLING\r
506 #ifdef PRINTMSG\r
507     fprintf(stderr,"START BLOCKCALLS\n");\r
508 #endif\r
509     streamregistry_getactiveftaid_reset();\r
510     while ((f=streamregistry_getactiveftaid())!=0) {\r
511         struct gscp_get_buffer_arg a;\r
512         a.h.callid = GSCP_GET_BUFFER;\r
513         a.h.size = sizeof(struct gscp_get_buffer_arg);\r
514         a.timeout = timeout;\r
515 #ifdef PRINTMSG\r
516         fprintf(stderr,"Waiting for  %u.%u.%u.%u:%u\n",\r
517                 (f->ip>>24)&0xff,\r
518                 (f->ip>>16)&0xff,\r
519                 (f->ip>>8)&0xff,\r
520                 (f->ip)&0xff,\r
521                 f->port\r
522                 );\r
523 #endif\r
524         if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {\r
525             s1=rdtsc();\r
526             return -1;\r
527         }\r
528     }\r
529 #ifdef PRINTMSG\r
530     fprintf(stderr,"BLOCK\n");\r
531 #endif\r
532     while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {\r
533 #else  // If we poll we return after 100 msec\r
534     sleepagain:\r
535         while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {\r
536 #endif\r
537             struct standard_result * sr = (struct standard_result *) buf;\r
538 #ifdef PRINTMSG\r
539             fprintf(stderr,"Got return code %u\n",sr->h.callid);\r
540 #endif\r
541             if (lopp==FTACALLBACK) {\r
542                 if (timeout != 0) {\r
543                     signal(SIGALRM, SIG_IGN);\r
544                 }\r
545                 if (sr->h.callid == WAKEUP) {\r
546                     /* use chance to cleanout message queue no reason\r
547                      to keep anything else */\r
548                     while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
549                     goto start;\r
550                 }\r
551                 if (sr->h.callid == TIMEOUT) {\r
552                     /* use chance to cleanout message queue no reason\r
553                      to keep anything else */\r
554                     while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
555                     *size=0;\r
556                     s1=rdtsc();\r
557                     return 1;\r
558                 }\r
559                 if (sidequeue_append(from,buf,length)<0) {\r
560                     gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
561                     s1=rdtsc();\r
562                     return -1;\r
563                 }\r
564             }\r
565         }\r
566 #ifdef POLLING\r
567         streamregistry_getactiveringbuf_reset();\r
568         while ((r=streamregistry_getactiveringbuf())>0) {\r
569 #ifdef PRINTMSG\r
570             fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"\r
571                     "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
572                     r->length);\r
573             if (UNREAD(r)) {\r
574                 fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,\r
575                         (CURREAD(r)->f.ip>>24)&0xff,\r
576                         (CURREAD(r)->f.ip>>16)&0xff,\r
577                         (CURREAD(r)->f.ip>>8)&0xff,\r
578                         (CURREAD(r)->f.ip)&0xff,\r
579                         CURREAD(r)->f.port,\r
580                         CURREAD(r)->f.streamid,\r
581                         CURREAD(r)->sz);\r
582             }\r
583             \r
584 #endif\r
585             if (UNREAD(r)) {\r
586                 *ftaid=(CURREAD(r)->f);\r
587                 *size=CURREAD(r)->sz;\r
588                 memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);\r
589                 intuple++;\r
590                 inbytes+=CURREAD(r)->sz;\r
591                 ADVANCEREAD(r);\r
592                 if (timeout != 0) {\r
593                     signal(SIGALRM, SIG_IGN);\r
594                 }\r
595                 s1=rdtsc();\r
596                 return 0;\r
597             }\r
598         }\r
599         goto sleepagain; // Try again\r
600 #endif\r
601         gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");\r
602         /* we should never get here */\r
603         s1=rdtsc();\r
604         return -1;\r
605     }\r
606     \r
607     \r
608     \r