Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / lappinterface.c
index 99196b8..bfaaed8 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-#include <gscpipc.h>
-#include <ipcencoding.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <lapp.h>
-#include <lappregistries.h>
-#include <string.h>
-#include <unistd.h>
-#include <signal.h>
-#include <sys/mman.h>
-#include "rdtsc.h"
-// If POLLING is defined applications poll every 100 msec instead of blocking
-#define POLLING
-
-struct processtate curprocess = {0,0,0,255,0};
-struct FTAID clearinghouseftaid = {0,0,0,0};
-
-/*
- * sends the message passed in buf and waits for a result
- * if a message returned is not a result it is put in the
- * request queue. The resultsbuf has to be large enough
- * for the largest result
- */
-gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t  msg, gs_sp_t  result)
-{
-    struct hostcall * h = (struct hostcall *) msg;
-    gs_int8_t  buf[MAXMSGSZ];
-    FTAID from;
-    gs_int32_t length;
-    gs_int32_t lowop;
-#ifdef PRINTMSG
-    fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "
-            "type %u with length %u\n",
-            (f.ip>>24)&0xff,
-            (f.ip>>16)&0xff,
-            (f.ip>>8)&0xff,
-            (f.ip)&0xff,
-            f.port,h->callid,h->size);
-#endif
-    if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {
-        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-        return -1;
-    }
-    h=(struct hostcall *) buf;
-    while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {
-#ifdef PRINTMSG
-        fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"
-                " of type %u with length %u\n",
-                (from.ip>>24)&0xff,
-                (from.ip>>16)&0xff,
-                (from.ip>>8)&0xff,
-                (from.ip)&0xff,
-                from.port,
-                h->callid,h->size);
-#endif
-        if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
-            h=(struct hostcall *) buf;
-            if (h->callid > RESULT_OPCODE_BASE) {
-                memcpy(result,buf,length);
-                return 0;
-            }
-            if (sidequeue_append(from,buf,length)<0) {
-                gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
-                return -1;
-            }
-        }
-    }
-    gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");
-    return -1;
-}
-
-
-gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])
-{
-    FILE * f;
-    
-    if (curprocess.active != 0 ) {
-        return -1;
-    }
-    
-    switch (type) {
-        case CLEARINGHOUSE:
-            if (gscpipc_init(1) < 0) {
-                gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
-                      "clearinghouse process\n");
-                return -1;
-            }
-            break;
-        case LFTA:
-#ifdef __linux__
-            mlockall(MCL_CURRENT|MCL_FUTURE);
-#endif
-        case APP:
-        case HFTA:
-            if (gscpipc_init(0) < 0) {
-                gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
-                      "non clearinghouse process\n");
-                return -1;
-            }
-            break;
-        default:
-            gslog(LOG_EMERG,"ERROR:Unknown process type\n");
-            return -1;
-    }
-    
-    // if the buffersize is zero then allocating shared memory
-    // will fail. So only use it for the clearinghouse and LFTAs
-    if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {
-        gslog(LOG_EMERG,
-              "ERROR:buffersize in hostlib_init has to "
-              "be at least %u Bytes long\n",
-              4*MAXTUPLESZ);
-        return -1;
-    }
-    
-    curprocess.type=type;
-    curprocess.buffersize=buffersize;
-    curprocess.active = 1;
-    curprocess.deviceid=deviceid;
-    curprocess.mapcnt=mapcnt;
-    curprocess.map=map;
-    return 0;
-}
-
-void hostlib_free()
-{
-    if (curprocess.active != 1 ) {
-        return;
-    }
-    curprocess.active = 0;
-    gscpipc_free();
-}
-
-
-gs_retval_t fta_find(FTAname name, gs_uint32_t  reuse, FTAID *ftaid,
-                     gs_sp_t  schema, gs_int32_t schemasz)
-{
-    gs_int8_t  rb[MAXRES];
-    struct fta_find_arg a;
-    struct ftafind_result * sr = (struct ftafind_result *)rb;
-    
-    a.h.callid = FTA_LOOKUP;
-    a.h.size = sizeof(struct fta_find_arg);
-    a.reuse=reuse;
-    if (strlen(name)>=(MAXFTANAME-1)) {
-        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-        return -1;
-    }
-    strcpy(a.name,name);
-    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
-    if (sr->h.callid != FTAFIND_RESULT) {
-        gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");
-        return -1;
-    }
-    if (sr->result >= 0) {
-        if (schema !=0) {
-            if (strlen(sr->schema) >= schemasz) {
-                gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");
-                return -1;
-            } else {
-                strcpy(schema,sr->schema);
-            }
-        }
-        *ftaid=sr->f;
-    }
-    return sr->result;
-}
-
-gs_retval_t fta_alloc_instance(FTAID subscriber,
-                               FTAID * ftaid, FTAname name, gs_sp_t schema,
-                               gs_uint32_t  reusable,
-                               gs_int32_t command, gs_int32_t sz, void *  data)
-{
-    gs_int8_t  rb[MAXRES];
-    struct fta_alloc_instance_arg * a;
-    struct fta_result * fr = (struct fta_result *)rb;
-    struct ringbuf *r;
-    
-    /* make sure we have the share memory required */
-    if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {
-        gslog(LOG_EMERG,"ERROR:could not allocate shared memory"
-              "for FTA %s\n",name);
-        return -1;
-    }
-    
-    if (strlen(name)>=(MAXFTANAME-1)) {
-        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-        return -1;
-    }
-    
-    if (strlen(schema)>=(MAXSCHEMASZ-1)) {
-        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-        return -1;
-    }
-    
-    a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
-    
-    a->h.callid = FTA_ALLOC_INSTANCE;
-    a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
-    a->f=*ftaid;
-    a->subscriber=subscriber;
-    a->reusable=reusable;
-    a->command = command;
-    a->sz = sz;
-    memcpy(&a->data[0],data,sz);
-    strcpy(a->name,name);
-    strcpy(a->schema,schema);
-    
-    ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
-    
-    if (fr->h.callid != FTA_RESULT) {
-        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-        return -1;
-    }
-    
-    *ftaid=fr->f;
-    
-    if (fr->result==0) {
-        gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
-        return streamregistry_add(*ftaid,r);
-    }
-    
-    return fr->result;
-}
-
-gs_retval_t fta_alloc_print_instance(FTAID subscriber,
-                                     FTAID * ftaid,
-                                     FTAname name, gs_sp_t schema, gs_uint32_t  reusable,
-                                     gs_int32_t command, gs_int32_t sz, void *  data,
-                                     gs_sp_t  path,gs_sp_t  basename,
-                                     gs_sp_t  temporal_field, gs_sp_t  split_field,
-                                     gs_uint32_t  delta, gs_uint32_t  split)
-{
-    gs_int8_t  rb[MAXRES];
-    struct fta_alloc_instance_arg * a;
-    struct fta_result * fr = (struct fta_result *)rb;
-    
-    if ((strlen(path)>=MAXPRINTSTRING-1)
-        || (strlen(basename)>=MAXPRINTSTRING-1)
-        || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {
-        gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"
-              " arguments to long\n");
-        return -1;
-    }
-    if (strlen(name)>=(MAXFTANAME-1)) {
-        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-        return -1;
-    }
-    
-    if (strlen(schema)>=(MAXSCHEMASZ-1)) {
-        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
-        return -1;
-    }
-    
-    a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
-    
-    a->h.callid = FTA_ALLOC_PRINT_INSTANCE;
-    a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
-    a->f=*ftaid;
-    a->subscriber=subscriber;
-    a->reusable=reusable;
-    a->split=split;
-    strcpy(a->name,name);
-    strcpy(a->schema,schema);
-    a->command = command;
-    a->sz = sz;
-    strcpy(a->path,path);
-    strcpy(a->basename,basename);
-    strcpy(a->temporal_field,temporal_field);
-    strcpy(a->split_field,split_field);
-    a->delta=delta;
-    memcpy(&a->data[0],data,sz);
-    
-    ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
-    
-    if (fr->h.callid != FTA_RESULT) {
-        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-        return -1;
-    }
-    
-    *ftaid=fr->f;
-    
-    return fr->result;
-}
-
-gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t  recursive)
-{
-    gs_int8_t  rb[MAXRES];
-    struct fta_free_instance_arg a;
-    struct standard_result * sr = (struct standard_result *)rb;
-    struct ringbuf *r;
-    
-    a.h.callid = FTA_FREE_INSTANCE;
-    a.h.size = sizeof(struct fta_free_instance_arg);
-    a.subscriber=subscriber;
-    a.f=ftaid;
-    a.recursive=recursive;
-    ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);
-    if (sr->h.callid != STANDARD_RESULT) {
-        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-        return -1;
-    }
-    
-    /* make sure we remove the mapping*/
-    streamregistry_remove(ftaid);
-    
-    return sr->result;
-}
-
-gs_retval_t fta_control(FTAID subscriber,
-                        FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
-{
-    gs_int8_t  rb[MAXRES];
-    struct fta_control_arg * a;
-    struct standard_result * sr = (struct standard_result *)rb;
-    
-    a = alloca(sizeof(struct fta_control_arg) + sz);
-    
-    a->h.callid = FTA_CONTROL;
-    a->h.size = sizeof(struct fta_control_arg)+ sz;
-    a->subscriber=subscriber;
-    a->f=ftaid;
-    a->command = command;
-    a->sz = sz;
-    memcpy(&a->data[0],value,sz);
-    
-    ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
-    
-    if (sr->h.callid != STANDARD_RESULT) {
-        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-        return -1;
-    }
-    
-    return sr->result;
-}
-
-gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,
-                          gs_uint32_t  sz, fta_stat * trace){
-#ifdef CLEARINGHOUSE_HEARTBEAT
-    struct fta_heartbeat_arg  * a;
-    a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));
-    a->h.callid = FTA_HEARTBEAT;
-    a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));
-    a->sender=self;
-    a->trace_id=trace_id;
-    a->sz=sz;
-    if (sz!=0) {
-        memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));
-    }
-#ifdef PRINTMSG
-    fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "
-            "type %u with length %u\n",
-            (clearinghouseftaid.ip>>24)&0xff,
-            (clearinghouseftaid.ip>>16)&0xff,
-            (clearinghouseftaid.ip>>8)&0xff,
-            (clearinghouseftaid.ip)&0xff,
-            clearinghouseftaid.port,a->h.callid,a->h.size);
-#endif
-    if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {
-        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-        return -1;
-    }
-#endif
-    return 0;
-}
-
-gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){
-    struct fta_notify_producer_failure_arg  a;
-    a.h.callid = FTA_PRODUCER_FAILURE;
-    a.h.size = sizeof(struct fta_notify_producer_failure_arg);
-    a.sender=self;
-    a.producer=producer;
-#ifdef PRINTMSG
-    fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "
-            "type %u with length %u\n",
-            (clearinghouseftaid.ip>>24)&0xff,
-            (clearinghouseftaid.ip>>16)&0xff,
-            (clearinghouseftaid.ip>>8)&0xff,
-            (clearinghouseftaid.ip)&0xff,
-            clearinghouseftaid.port,a.h.callid,a.h.size);
-#endif
-    if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {
-        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-        return -1;
-    }
-    return 0;
-}
-
-gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
-{
-    gs_int8_t  rb[MAXRES];
-    struct process_control_arg * a;
-    struct standard_result * sr = (struct standard_result *)rb;
-    
-    
-    a = alloca(sizeof(struct process_control_arg) + sz);
-    
-    a->h.callid = PROCESS_CONTROL;
-    a->h.size = sizeof(struct process_control_arg)+ sz;
-    a->command = command;
-    a->sz = sz;
-    memcpy(&a->data[0],value,sz);
-    
-    ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
-    
-    if (sr->h.callid != STANDARD_RESULT) {
-        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
-        return -1;
-    }
-    
-    return sr->result;
-}
-
-
-static void timeouthandler ()
-{
-    struct timeout_result a;
-    
-    a.h.callid=TIMEOUT;
-    a.h.size=sizeof(struct timeout_result);
-    if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {
-        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
-    }
-}
-
-gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,
-                            gs_int32_t tbuf_size, gs_int32_t timeout)
-{
-    struct ringbuf * r;
-    FTAID from;
-    gs_int32_t length;
-    gs_int8_t  buf[MAXMSGSZ];
-    gs_int32_t lopp;
-    FTAID * f;
-    static     gs_uint64_t s1=0;
-    static     gs_uint64_t s2;
-    if (s1==0) {
-        s1=rdtsc();
-    }
-    s2=rdtsc();
-    cycles+=(s2-s1);
-start:
-#ifdef PRINTMSG
-    fprintf(stderr,"CHECK RINGBUFS\n");
-#endif
-#ifndef POLLING
-    /* use chance to cleanout message queue no reason
-     to keep anything else */
-    while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
-#endif
-    
-    streamregistry_getactiveringbuf_reset();
-    while ((r=streamregistry_getactiveringbuf())>0) {
-#ifdef PRINTMSG
-           fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
-                "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
-                r->length);
-           if (UNREAD(r)) {
-            fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
-                    (CURREAD(r)->f.ip>>24)&0xff,
-                    (CURREAD(r)->f.ip>>16)&0xff,
-                    (CURREAD(r)->f.ip>>8)&0xff,
-                    (CURREAD(r)->f.ip)&0xff,
-                    CURREAD(r)->f.port,
-                    CURREAD(r)->f.streamid,
-                    CURREAD(r)->sz);
-           }
-        
-#endif
-        if (UNREAD(r)) {
-            *ftaid=(CURREAD(r)->f);
-            *size=CURREAD(r)->sz;
-            memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
-            intuple++;
-            inbytes+=CURREAD(r)->sz;
-            ADVANCEREAD(r);
-            s1=rdtsc();
-            return 0;
-        }
-    }
-    if (timeout == -1) {
-        *size=0;
-        s1=rdtsc();
-        return 1;
-    }
-    if (timeout !=0) {
-        signal(SIGALRM, timeouthandler);
-        alarm(timeout);
-    }
-    
-#ifndef POLLING
-#ifdef PRINTMSG
-    fprintf(stderr,"START BLOCKCALLS\n");
-#endif
-    streamregistry_getactiveftaid_reset();
-    while ((f=streamregistry_getactiveftaid())!=0) {
-        struct gscp_get_buffer_arg a;
-        a.h.callid = GSCP_GET_BUFFER;
-        a.h.size = sizeof(struct gscp_get_buffer_arg);
-        a.timeout = timeout;
-#ifdef PRINTMSG
-        fprintf(stderr,"Waiting for  %u.%u.%u.%u:%u\n",
-                (f->ip>>24)&0xff,
-                (f->ip>>16)&0xff,
-                (f->ip>>8)&0xff,
-                (f->ip)&0xff,
-                f->port
-                );
-#endif
-        if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
-            s1=rdtsc();
-            return -1;
-        }
-    }
-#ifdef PRINTMSG
-    fprintf(stderr,"BLOCK\n");
-#endif
-    while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {
-#else  // If we poll we return after 100 msec
-    sleepagain:
-        while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {
-#endif
-            struct standard_result * sr = (struct standard_result *) buf;
-#ifdef PRINTMSG
-            fprintf(stderr,"Got return code %u\n",sr->h.callid);
-#endif
-            if (lopp==FTACALLBACK) {
-                if (timeout != 0) {
-                    signal(SIGALRM, SIG_IGN);
-                }
-                if (sr->h.callid == WAKEUP) {
-                    /* use chance to cleanout message queue no reason
-                     to keep anything else */
-                    while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
-                    goto start;
-                }
-                if (sr->h.callid == TIMEOUT) {
-                    /* use chance to cleanout message queue no reason
-                     to keep anything else */
-                    while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
-                    *size=0;
-                    s1=rdtsc();
-                    return 1;
-                }
-                if (sidequeue_append(from,buf,length)<0) {
-                    gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
-                    s1=rdtsc();
-                    return -1;
-                }
-            }
-        }
-#ifdef POLLING
-        streamregistry_getactiveringbuf_reset();
-        while ((r=streamregistry_getactiveringbuf())>0) {
-#ifdef PRINTMSG
-            fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
-                    "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
-                    r->length);
-            if (UNREAD(r)) {
-                fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
-                        (CURREAD(r)->f.ip>>24)&0xff,
-                        (CURREAD(r)->f.ip>>16)&0xff,
-                        (CURREAD(r)->f.ip>>8)&0xff,
-                        (CURREAD(r)->f.ip)&0xff,
-                        CURREAD(r)->f.port,
-                        CURREAD(r)->f.streamid,
-                        CURREAD(r)->sz);
-            }
-            
-#endif
-            if (UNREAD(r)) {
-                *ftaid=(CURREAD(r)->f);
-                *size=CURREAD(r)->sz;
-                memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
-                intuple++;
-                inbytes+=CURREAD(r)->sz;
-                ADVANCEREAD(r);
-                if (timeout != 0) {
-                    signal(SIGALRM, SIG_IGN);
-                }
-                s1=rdtsc();
-                return 0;
-            }
-        }
-        goto sleepagain; // Try again
-#endif
-        gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");
-        /* we should never get here */
-        s1=rdtsc();
-        return -1;
-    }
-    
-    
-    
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+#include <gscpipc.h>\r
+#include <ipcencoding.h>\r
+#include <stdlib.h>\r
+#include <stdio.h>\r
+#include <lapp.h>\r
+#include <lappregistries.h>\r
+#include <string.h>\r
+#include <unistd.h>\r
+#include <signal.h>\r
+#include <sys/mman.h>\r
+#include "rdtsc.h"\r
+// If POLLING is defined applications poll every 100 msec instead of blocking\r
+#define POLLING\r
+\r
+struct processtate curprocess = {0,0,0,255,0};\r
+struct FTAID clearinghouseftaid = {0,0,0,0};\r
+\r
+/*\r
+ * sends the message passed in buf and waits for a result\r
+ * if a message returned is not a result it is put in the\r
+ * request queue. The resultsbuf has to be large enough\r
+ * for the largest result\r
+ */\r
+gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t  msg, gs_sp_t  result)\r
+{\r
+    struct hostcall * h = (struct hostcall *) msg;\r
+    gs_int8_t  buf[MAXMSGSZ];\r
+    FTAID from;\r
+    gs_int32_t length;\r
+    gs_int32_t lowop;\r
+#ifdef PRINTMSG\r
+    fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "\r
+            "type %u with length %u\n",\r
+            (f.ip>>24)&0xff,\r
+            (f.ip>>16)&0xff,\r
+            (f.ip>>8)&0xff,\r
+            (f.ip)&0xff,\r
+            f.port,h->callid,h->size);\r
+#endif\r
+    if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+        return -1;\r
+    }\r
+    h=(struct hostcall *) buf;\r
+    while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {\r
+#ifdef PRINTMSG\r
+        fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"\r
+                " of type %u with length %u\n",\r
+                (from.ip>>24)&0xff,\r
+                (from.ip>>16)&0xff,\r
+                (from.ip>>8)&0xff,\r
+                (from.ip)&0xff,\r
+                from.port,\r
+                h->callid,h->size);\r
+#endif\r
+        if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {\r
+            h=(struct hostcall *) buf;\r
+            if (h->callid > RESULT_OPCODE_BASE) {\r
+                memcpy(result,buf,length);\r
+                return 0;\r
+            }\r
+            if (sidequeue_append(from,buf,length)<0) {\r
+                gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
+                return -1;\r
+            }\r
+        }\r
+    }\r
+    gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");\r
+    return -1;\r
+}\r
+\r
+\r
+gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])\r
+{\r
+    FILE * f;\r
+    \r
+    if (curprocess.active != 0 ) {\r
+        return -1;\r
+    }\r
+    \r
+    switch (type) {\r
+        case CLEARINGHOUSE:\r
+            if (gscpipc_init(1) < 0) {\r
+                gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "\r
+                      "clearinghouse process\n");\r
+                return -1;\r
+            }\r
+            break;\r
+        case LFTA:\r
+#ifdef __linux__\r
+            mlockall(MCL_CURRENT|MCL_FUTURE);\r
+#endif\r
+        case APP:\r
+        case HFTA:\r
+            if (gscpipc_init(0) < 0) {\r
+                gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "\r
+                      "non clearinghouse process\n");\r
+                return -1;\r
+            }\r
+            break;\r
+        default:\r
+            gslog(LOG_EMERG,"ERROR:Unknown process type\n");\r
+            return -1;\r
+    }\r
+    \r
+    // if the buffersize is zero then allocating shared memory\r
+    // will fail. So only use it for the clearinghouse and LFTAs\r
+    if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {\r
+        gslog(LOG_EMERG,\r
+              "ERROR:buffersize in hostlib_init has to "\r
+              "be at least %u Bytes long\n",\r
+              4*MAXTUPLESZ);\r
+        return -1;\r
+    }\r
+    \r
+    curprocess.type=type;\r
+    curprocess.buffersize=buffersize;\r
+    curprocess.active = 1;\r
+    curprocess.deviceid=deviceid;\r
+    curprocess.mapcnt=mapcnt;\r
+    curprocess.map=map;\r
+    return 0;\r
+}\r
+\r
+void hostlib_free()\r
+{\r
+    if (curprocess.active != 1 ) {\r
+        return;\r
+    }\r
+    curprocess.active = 0;\r
+    gscpipc_free();\r
+}\r
+\r
+\r
+gs_retval_t fta_find(FTAname name, gs_uint32_t  reuse, FTAID *ftaid,\r
+                     gs_sp_t  schema, gs_int32_t schemasz)\r
+{\r
+    gs_int8_t  rb[MAXRES];\r
+    struct fta_find_arg a;\r
+    struct ftafind_result * sr = (struct ftafind_result *)rb;\r
+    \r
+    a.h.callid = FTA_LOOKUP;\r
+    a.h.size = sizeof(struct fta_find_arg);\r
+    a.reuse=reuse;\r
+    if (strlen(name)>=(MAXFTANAME-1)) {\r
+        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+        return -1;\r
+    }\r
+    strcpy(a.name,name);\r
+    ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
+    if (sr->h.callid != FTAFIND_RESULT) {\r
+        gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");\r
+        return -1;\r
+    }\r
+    if (sr->result >= 0) {\r
+        if (schema !=0) {\r
+            if (strlen(sr->schema) >= schemasz) {\r
+                gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");\r
+                return -1;\r
+            } else {\r
+                strcpy(schema,sr->schema);\r
+            }\r
+        }\r
+        *ftaid=sr->f;\r
+    }\r
+    return sr->result;\r
+}\r
+\r
+gs_retval_t fta_alloc_instance(FTAID subscriber,\r
+                               FTAID * ftaid, FTAname name, gs_sp_t schema,\r
+                               gs_uint32_t  reusable,\r
+                               gs_int32_t command, gs_int32_t sz, void *  data)\r
+{\r
+    gs_int8_t  rb[MAXRES];\r
+    struct fta_alloc_instance_arg * a;\r
+    struct fta_result * fr = (struct fta_result *)rb;\r
+    struct ringbuf *r;\r
+    \r
+    /* make sure we have the share memory required */\r
+    if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {\r
+        gslog(LOG_EMERG,"ERROR:could not allocate shared memory"\r
+              "for FTA %s\n",name);\r
+        return -1;\r
+    }\r
+    \r
+    if (strlen(name)>=(MAXFTANAME-1)) {\r
+        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+        return -1;\r
+    }\r
+    \r
+    if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
+        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+        return -1;\r
+    }\r
+    \r
+    a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);\r
+    \r
+    a->h.callid = FTA_ALLOC_INSTANCE;\r
+    a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;\r
+    a->f=*ftaid;\r
+    a->subscriber=subscriber;\r
+    a->reusable=reusable;\r
+    a->command = command;\r
+    a->sz = sz;\r
+    memcpy(&a->data[0],data,sz);\r
+    strcpy(a->name,name);\r
+    strcpy(a->schema,schema);\r
+    \r
+    ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);\r
+    \r
+    if (fr->h.callid != FTA_RESULT) {\r
+        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+        return -1;\r
+    }\r
+    \r
+    *ftaid=fr->f;\r
+    \r
+    if (fr->result==0) {\r
+        gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);\r
+        return streamregistry_add(*ftaid,r);\r
+    }\r
+    \r
+    return fr->result;\r
+}\r
+\r
+gs_retval_t fta_alloc_print_instance(FTAID subscriber,\r
+                                     FTAID * ftaid,\r
+                                     FTAname name, gs_sp_t schema, gs_uint32_t  reusable,\r
+                                     gs_int32_t command, gs_int32_t sz, void *  data,\r
+                                     gs_sp_t  path,gs_sp_t  basename,\r
+                                     gs_sp_t  temporal_field, gs_sp_t  split_field,\r
+                                     gs_uint32_t  delta, gs_uint32_t  split)\r
+{\r
+    gs_int8_t  rb[MAXRES];\r
+    struct fta_alloc_instance_arg * a;\r
+    struct fta_result * fr = (struct fta_result *)rb;\r
+    \r
+    if ((strlen(path)>=MAXPRINTSTRING-1)\r
+        || (strlen(basename)>=MAXPRINTSTRING-1)\r
+        || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {\r
+        gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"\r
+              " arguments to long\n");\r
+        return -1;\r
+    }\r
+    if (strlen(name)>=(MAXFTANAME-1)) {\r
+        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+        return -1;\r
+    }\r
+    \r
+    if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
+        gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
+        return -1;\r
+    }\r
+    \r
+    a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);\r
+    \r
+    a->h.callid = FTA_ALLOC_PRINT_INSTANCE;\r
+    a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;\r
+    a->f=*ftaid;\r
+    a->subscriber=subscriber;\r
+    a->reusable=reusable;\r
+    a->split=split;\r
+    strcpy(a->name,name);\r
+    strcpy(a->schema,schema);\r
+    a->command = command;\r
+    a->sz = sz;\r
+    strcpy(a->path,path);\r
+    strcpy(a->basename,basename);\r
+    strcpy(a->temporal_field,temporal_field);\r
+    strcpy(a->split_field,split_field);\r
+    a->delta=delta;\r
+    memcpy(&a->data[0],data,sz);\r
+    \r
+    ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);\r
+    \r
+    if (fr->h.callid != FTA_RESULT) {\r
+        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+        return -1;\r
+    }\r
+    \r
+    *ftaid=fr->f;\r
+    \r
+    return fr->result;\r
+}\r
+\r
+gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t  recursive)\r
+{\r
+    gs_int8_t  rb[MAXRES];\r
+    struct fta_free_instance_arg a;\r
+    struct standard_result * sr = (struct standard_result *)rb;\r
+    struct ringbuf *r;\r
+    \r
+    a.h.callid = FTA_FREE_INSTANCE;\r
+    a.h.size = sizeof(struct fta_free_instance_arg);\r
+    a.subscriber=subscriber;\r
+    a.f=ftaid;\r
+    a.recursive=recursive;\r
+    ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);\r
+    if (sr->h.callid != STANDARD_RESULT) {\r
+        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+        return -1;\r
+    }\r
+    \r
+    /* make sure we remove the mapping*/\r
+    streamregistry_remove(ftaid);\r
+    \r
+    return sr->result;\r
+}\r
+\r
+gs_retval_t fta_control(FTAID subscriber,\r
+                        FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)\r
+{\r
+    gs_int8_t  rb[MAXRES];\r
+    struct fta_control_arg * a;\r
+    struct standard_result * sr = (struct standard_result *)rb;\r
+    \r
+    a = alloca(sizeof(struct fta_control_arg) + sz);\r
+    \r
+    a->h.callid = FTA_CONTROL;\r
+    a->h.size = sizeof(struct fta_control_arg)+ sz;\r
+    a->subscriber=subscriber;\r
+    a->f=ftaid;\r
+    a->command = command;\r
+    a->sz = sz;\r
+    memcpy(&a->data[0],value,sz);\r
+    \r
+    ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);\r
+    \r
+    if (sr->h.callid != STANDARD_RESULT) {\r
+        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+        return -1;\r
+    }\r
+    \r
+    return sr->result;\r
+}\r
+\r
+gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,\r
+                          gs_uint32_t  sz, fta_stat * trace){\r
+#ifdef CLEARINGHOUSE_HEARTBEAT\r
+    struct fta_heartbeat_arg  * a;\r
+    a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));\r
+    a->h.callid = FTA_HEARTBEAT;\r
+    a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));\r
+    a->sender=self;\r
+    a->trace_id=trace_id;\r
+    a->sz=sz;\r
+    if (sz!=0) {\r
+        memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));\r
+    }\r
+#ifdef PRINTMSG\r
+    fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "\r
+            "type %u with length %u\n",\r
+            (clearinghouseftaid.ip>>24)&0xff,\r
+            (clearinghouseftaid.ip>>16)&0xff,\r
+            (clearinghouseftaid.ip>>8)&0xff,\r
+            (clearinghouseftaid.ip)&0xff,\r
+            clearinghouseftaid.port,a->h.callid,a->h.size);\r
+#endif\r
+    if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+        return -1;\r
+    }\r
+#endif\r
+    return 0;\r
+}\r
+\r
+gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){\r
+    struct fta_notify_producer_failure_arg  a;\r
+    a.h.callid = FTA_PRODUCER_FAILURE;\r
+    a.h.size = sizeof(struct fta_notify_producer_failure_arg);\r
+    a.sender=self;\r
+    a.producer=producer;\r
+#ifdef PRINTMSG\r
+    fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "\r
+            "type %u with length %u\n",\r
+            (clearinghouseftaid.ip>>24)&0xff,\r
+            (clearinghouseftaid.ip>>16)&0xff,\r
+            (clearinghouseftaid.ip>>8)&0xff,\r
+            (clearinghouseftaid.ip)&0xff,\r
+            clearinghouseftaid.port,a.h.callid,a.h.size);\r
+#endif\r
+    if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+        return -1;\r
+    }\r
+    return 0;\r
+}\r
+\r
+gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)\r
+{\r
+    gs_int8_t  rb[MAXRES];\r
+    struct process_control_arg * a;\r
+    struct standard_result * sr = (struct standard_result *)rb;\r
+    \r
+    \r
+    a = alloca(sizeof(struct process_control_arg) + sz);\r
+    \r
+    a->h.callid = PROCESS_CONTROL;\r
+    a->h.size = sizeof(struct process_control_arg)+ sz;\r
+    a->command = command;\r
+    a->sz = sz;\r
+    memcpy(&a->data[0],value,sz);\r
+    \r
+    ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);\r
+    \r
+    if (sr->h.callid != STANDARD_RESULT) {\r
+        gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
+        return -1;\r
+    }\r
+    \r
+    return sr->result;\r
+}\r
+\r
+\r
+static void timeouthandler ()\r
+{\r
+    struct timeout_result a;\r
+    \r
+    a.h.callid=TIMEOUT;\r
+    a.h.size=sizeof(struct timeout_result);\r
+    if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {\r
+        gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
+    }\r
+}\r
+\r
+gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,\r
+                            gs_int32_t tbuf_size, gs_int32_t timeout)\r
+{\r
+    struct ringbuf * r;\r
+    FTAID from;\r
+    gs_int32_t length;\r
+    gs_int8_t  buf[MAXMSGSZ];\r
+    gs_int32_t lopp;\r
+    FTAID * f;\r
+    static     gs_uint64_t s1=0;\r
+    static     gs_uint64_t s2;\r
+    if (s1==0) {\r
+        s1=rdtsc();\r
+    }\r
+    s2=rdtsc();\r
+    cycles+=(s2-s1);\r
+start:\r
+#ifdef PRINTMSG\r
+    fprintf(stderr,"CHECK RINGBUFS\n");\r
+#endif\r
+#ifndef POLLING\r
+    /* use chance to cleanout message queue no reason\r
+     to keep anything else */\r
+    while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
+#endif\r
+    \r
+    streamregistry_getactiveringbuf_reset();\r
+    while ((r=streamregistry_getactiveringbuf())>0) {\r
+#ifdef PRINTMSG\r
+           fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"\r
+                "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
+                r->length);\r
+           if (UNREAD(r)) {\r
+            fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,\r
+                    (CURREAD(r)->f.ip>>24)&0xff,\r
+                    (CURREAD(r)->f.ip>>16)&0xff,\r
+                    (CURREAD(r)->f.ip>>8)&0xff,\r
+                    (CURREAD(r)->f.ip)&0xff,\r
+                    CURREAD(r)->f.port,\r
+                    CURREAD(r)->f.streamid,\r
+                    CURREAD(r)->sz);\r
+           }\r
+        \r
+#endif\r
+        if (UNREAD(r)) {\r
+            *ftaid=(CURREAD(r)->f);\r
+            *size=CURREAD(r)->sz;\r
+            memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);\r
+            intuple++;\r
+            inbytes+=CURREAD(r)->sz;\r
+            ADVANCEREAD(r);\r
+            s1=rdtsc();\r
+            return 0;\r
+        }\r
+    }\r
+    if (timeout == -1) {\r
+        *size=0;\r
+        s1=rdtsc();\r
+        return 1;\r
+    }\r
+    if (timeout !=0) {\r
+        signal(SIGALRM, timeouthandler);\r
+        alarm(timeout);\r
+    }\r
+    \r
+#ifndef POLLING\r
+#ifdef PRINTMSG\r
+    fprintf(stderr,"START BLOCKCALLS\n");\r
+#endif\r
+    streamregistry_getactiveftaid_reset();\r
+    while ((f=streamregistry_getactiveftaid())!=0) {\r
+        struct gscp_get_buffer_arg a;\r
+        a.h.callid = GSCP_GET_BUFFER;\r
+        a.h.size = sizeof(struct gscp_get_buffer_arg);\r
+        a.timeout = timeout;\r
+#ifdef PRINTMSG\r
+        fprintf(stderr,"Waiting for  %u.%u.%u.%u:%u\n",\r
+                (f->ip>>24)&0xff,\r
+                (f->ip>>16)&0xff,\r
+                (f->ip>>8)&0xff,\r
+                (f->ip)&0xff,\r
+                f->port\r
+                );\r
+#endif\r
+        if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {\r
+            s1=rdtsc();\r
+            return -1;\r
+        }\r
+    }\r
+#ifdef PRINTMSG\r
+    fprintf(stderr,"BLOCK\n");\r
+#endif\r
+    while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {\r
+#else  // If we poll we return after 100 msec\r
+    sleepagain:\r
+        while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {\r
+#endif\r
+            struct standard_result * sr = (struct standard_result *) buf;\r
+#ifdef PRINTMSG\r
+            fprintf(stderr,"Got return code %u\n",sr->h.callid);\r
+#endif\r
+            if (lopp==FTACALLBACK) {\r
+                if (timeout != 0) {\r
+                    signal(SIGALRM, SIG_IGN);\r
+                }\r
+                if (sr->h.callid == WAKEUP) {\r
+                    /* use chance to cleanout message queue no reason\r
+                     to keep anything else */\r
+                    while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
+                    goto start;\r
+                }\r
+                if (sr->h.callid == TIMEOUT) {\r
+                    /* use chance to cleanout message queue no reason\r
+                     to keep anything else */\r
+                    while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
+                    *size=0;\r
+                    s1=rdtsc();\r
+                    return 1;\r
+                }\r
+                if (sidequeue_append(from,buf,length)<0) {\r
+                    gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
+                    s1=rdtsc();\r
+                    return -1;\r
+                }\r
+            }\r
+        }\r
+#ifdef POLLING\r
+        streamregistry_getactiveringbuf_reset();\r
+        while ((r=streamregistry_getactiveringbuf())>0) {\r
+#ifdef PRINTMSG\r
+            fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"\r
+                    "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
+                    r->length);\r
+            if (UNREAD(r)) {\r
+                fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,\r
+                        (CURREAD(r)->f.ip>>24)&0xff,\r
+                        (CURREAD(r)->f.ip>>16)&0xff,\r
+                        (CURREAD(r)->f.ip>>8)&0xff,\r
+                        (CURREAD(r)->f.ip)&0xff,\r
+                        CURREAD(r)->f.port,\r
+                        CURREAD(r)->f.streamid,\r
+                        CURREAD(r)->sz);\r
+            }\r
+            \r
+#endif\r
+            if (UNREAD(r)) {\r
+                *ftaid=(CURREAD(r)->f);\r
+                *size=CURREAD(r)->sz;\r
+                memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);\r
+                intuple++;\r
+                inbytes+=CURREAD(r)->sz;\r
+                ADVANCEREAD(r);\r
+                if (timeout != 0) {\r
+                    signal(SIGALRM, SIG_IGN);\r
+                }\r
+                s1=rdtsc();\r
+                return 0;\r
+            }\r
+        }\r
+        goto sleepagain; // Try again\r
+#endif\r
+        gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");\r
+        /* we should never get here */\r
+        s1=rdtsc();\r
+        return -1;\r
+    }\r
+    \r
+    \r
+    \r