Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / gscpipc.c
index bee7b39..502c54c 100644 (file)
-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
\r
- http://www.apache.org/licenses/LICENSE-2.0\r
\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include "gsconfig.h"\r
-#include "gstypes.h"\r
-#include "gscpipc.h"\r
-#include "gshub.h"\r
-#include <sys/types.h>\r
-#include <sys/stat.h>\r
-#include <sys/ipc.h>\r
-#include <sys/shm.h>\r
-#include <stdlib.h>\r
-#include <stdio.h>\r
-#include <fcntl.h>\r
-#include <errno.h>\r
-#include <sys/socket.h>\r
-#include <netinet/in.h>\r
-\r
-#ifdef __linux__\r
-#include <sys/ioctl.h>\r
-#include <netinet/tcp.h>\r
-#include <linux/sockios.h>\r
-#endif\r
-\r
-#ifndef socklen_t\r
-#define socklen_t gs_uint32_t\r
-#endif\r
-\r
-\r
-struct FTAID clearinghouseftaid;\r
-\r
-\r
-\r
-struct connection {\r
-    gs_int32_t socket; /* socket for connection */\r
-    gs_int32_t used;   /* 1 if the entry is in use */\r
-    FTAID remoteid; /* remoteid of connection */\r
-};\r
-\r
-struct connection connectionhash[SOCKET_HASH_SZ];\r
-\r
-gs_int32_t clearinghouse=0;\r
-\r
-#define SHMTYPE 's'\r
-#define SHM_RECV 1\r
-#define SHM_SEND 2\r
-\r
-struct shmlistentry {\r
-    FTAID msgid;\r
-    gs_int32_t type;\r
-    key_t shmtoken;\r
-    gs_int32_t shmid;\r
-    struct ringbuf * buf;\r
-    gs_int32_t buffsize;\r
-};\r
-\r
-gs_int32_t shmlistlen=0;\r
-struct shmlistentry shmlist[MAX_NUMBER_OF_SHM];\r
-\r
-struct sq {\r
-    gs_int8_t buf[MAXMSGSZ];\r
-    gs_int32_t length;\r
-    struct sq * next;\r
-};\r
-\r
-static struct sq * sqtop=0;\r
-static struct sq * sqtail=0;\r
-\r
-/* adds a buffer to the end of the sidequeue*/\r
-gs_retval_t gscpipc_sidequeue_append(gs_sp_t  buf, gs_int32_t length)\r
-{\r
-    struct sq * s;\r
-    if ((s=malloc(sizeof(struct sq)))==0) {\r
-        gslog(LOG_EMERG,"Could not allocate memory for sidequeue");\r
-        return -1;\r
-    }\r
-    memcpy(&s->buf[0],buf,MAXMSGSZ);\r
-    s->length=length;\r
-    s->next=0;\r
-    if (sqtail) {\r
-        sqtail->next=s;\r
-        sqtail=s;\r
-    } else {\r
-        sqtop = s;\r
-        sqtail = s;\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* removes a buffer from the top of the sidequeue*/\r
-gs_retval_t gscpipc_sidequeue_pop(gs_sp_t  buf, gs_int32_t * length, gs_int32_t buflen)\r
-{\r
-    struct sq * s;\r
-    if (sqtop) {\r
-        if (sqtop->length > buflen) {\r
-            return -2;\r
-        }\r
-        memcpy(buf,&sqtop->buf[0],sqtop->length);\r
-        *length=sqtop->length;\r
-        s=sqtop;\r
-        sqtop=sqtop->next;\r
-        if (sqtop==0) sqtail=0;\r
-        free(s);\r
-        return 0;\r
-    }\r
-    return -1;\r
-}\r
-\r
-struct ipc_message {\r
-    FTAID receiver;\r
-    FTAID sender;\r
-    gs_int32_t operation;\r
-#ifdef PRINTMSG\r
-    gs_int32_t pmsgid;\r
-#endif\r
-    gs_int32_t  size;\r
-    gs_int8_t data[4];\r
-};\r
-\r
-#define LOWLEVELOP_ACK 0\r
-#define LOWLEVELOP_NACK 1\r
-#define LOWLEVELOP_REGISTER 2\r
-#define LOWLEVELOP_UNREGISTER 3\r
-#define LOWLEVELOP_SHM_REGISTER 4\r
-#define LOWLEVELOP_SHM_FREE 5\r
-#define LOWLEVELOP_SHM_REMOTE_REGISTER 6\r
-#define LOWLEVELOP_SHM_REMOTE_FREE 7\r
-#define LOWLEVELOP_REMOTE_TUPLE 8\r
-\r
-struct internal_message{\r
-    struct ipc_message im;\r
-    int lowlevelop;\r
-    key_t shmtoken;\r
-    int shmsz;\r
-    int result;\r
-};\r
-\r
-struct internal_remote_tuple{\r
-    struct ipc_message im;\r
-    int lowlevelop;\r
-    int size;\r
-    gs_int8_t data[4];\r
-};\r
-\r
-FTAID myid;\r
-int listen_socket;\r
-\r
-#ifdef PRINTMSG\r
-int pmsgid=0;\r
-#endif\r
-\r
-\r
-struct shmlistentry * shmlist_find(FTAID msgid, int type)\r
-{\r
-    int x;\r
-    for (x=0; x<shmlistlen; x++) {\r
-        if ((shmlist[x].msgid.ip == msgid.ip)\r
-            && (shmlist[x].msgid.port == msgid.port)\r
-            && (shmlist[x].type == type)) {\r
-            return &(shmlist[x]);\r
-        }\r
-    }\r
-    return 0;\r
-}\r
-\r
-gs_retval_t shmlist_add(FTAID msgid, int type, key_t shmtoken,\r
-                        int shmid, struct ringbuf * buf, int buffsize)\r
-{\r
-    if (shmlist_find(msgid, type) !=0) {\r
-        return -1;\r
-    }\r
-    if (shmlistlen>=MAX_NUMBER_OF_SHM) {\r
-        gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many"\r
-              "shm registered\n");\r
-        return -1;\r
-    }\r
-    shmlist[shmlistlen].msgid=msgid;\r
-    shmlist[shmlistlen].type=type;\r
-    shmlist[shmlistlen].shmtoken=shmtoken;\r
-    shmlist[shmlistlen].shmid=shmid;\r
-    shmlist[shmlistlen].buf=buf;\r
-    shmlist[shmlistlen].buffsize=buffsize;\r
-    shmlistlen++;\r
-    return 0;\r
-}\r
-\r
-static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type)\r
-{\r
-    gs_int32_t x;\r
-    gs_int32_t move=0;\r
-    for (x=0;x<shmlistlen;x++) {\r
-        if ((shmlist[x].msgid.ip == msgid.ip)\r
-            && (shmlist[x].msgid.port == msgid.port)\r
-            &&(shmlist[x].type==type)) {\r
-            shmlistlen--;\r
-            move=1;\r
-        }\r
-        if ((move==1)&&(x<shmlistlen)) {\r
-            shmlist[x]=shmlist[x+1];\r
-        }\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-/* starts the listen socket for the current proccess\r
- if the listen_port is 0 then a random port is assigned\r
- this function also sets myid\r
- */\r
-static gs_retval_t msg_init(gs_uint32_t clearinghouse) {\r
-    struct sockaddr_in sin;\r
-    gs_int32_t x;\r
-    FILE *f;\r
-    socklen_t sin_sz;\r
-    \r
-    /* mark all entries in the connection hash as unused */\r
-    for(x=0;x<SOCKET_HASH_SZ;x++) {\r
-        connectionhash[x].used=0;\r
-    }\r
-    \r
-    myid.index=0;\r
-    myid.streamid=0;\r
-    \r
-    bzero(&sin, sizeof(sin));\r
-    sin.sin_family = AF_INET;\r
-#ifdef __NetBSD__\r
-    sin.sin_len = sizeof(sin);\r
-#endif\r
-    sin.sin_addr.s_addr = 0;\r
-    sin.sin_port = 0;\r
-    \r
-    if ((listen_socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not create listen socket\n");\r
-        return -1;\r
-    }\r
-    \r
-    if (bind(listen_socket, (struct sockaddr *) &sin, sizeof(sin)) < 0 ) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not bind to socket for ip %x port %u with error %u \n",\r
-              ntohl(sin.sin_addr.s_addr), ntohs(sin.sin_port),errno);\r
-        return -1;\r
-    }\r
-    \r
-    if (listen(listen_socket, 64) < 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not listen to socket for port %u \n",ntohs(sin.sin_port));\r
-        close(listen_socket);\r
-        return -1;\r
-    }\r
-    sin_sz=sizeof(sin);\r
-    if (getsockname(listen_socket, (struct sockaddr *) &sin, &sin_sz) < 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not get local port number of listen socket\n");\r
-        return -1;\r
-    }\r
-    \r
-    myid.port=ntohs(sin.sin_port);\r
-    myid.ip=ntohl(sin.sin_addr.s_addr);\r
-    \r
-    return 0;\r
-}\r
-\r
-static void closeconnection(gs_int32_t x) {\r
-    if (connectionhash[x].used==1)  {\r
-        close(connectionhash[x].socket);\r
-        connectionhash[x].used=0;\r
-    }\r
-}\r
-\r
-static gs_retval_t writeall(gs_int32_t socket, void * b, gs_int32_t sz) {\r
-    gs_int32_t rv;\r
-    gs_sp_t  buf = (gs_sp_t )b;\r
-    gs_int32_t res=sz;\r
-    while(sz>0) {\r
-        if ((rv=write(socket,buf,sz))<0) {\r
-            if (errno == EINTR)\r
-                continue;\r
-            else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS\r
-                return 0;\r
-            else\r
-                return -1;\r
-        }\r
-        sz-=rv;\r
-        buf+=rv;\r
-    }\r
-    return res;\r
-}\r
-\r
-static gs_retval_t msg_send(FTAID id, gs_sp_t  buf, gs_uint32_t len, gs_uint32_t block) {\r
-    struct sockaddr_in sin;\r
-    gs_int32_t x;\r
-    gs_int32_t u;\r
-    gs_int32_t sz;\r
-    gs_int32_t ret;\r
-    \r
-try_send_again:\r
-    u=-1;\r
-    for(x=0;x<SOCKET_HASH_SZ;x++) {\r
-        if (connectionhash[x].used==1) {\r
-            if ((connectionhash[x].remoteid.ip==id.ip)\r
-                && (connectionhash[x].remoteid.port==id.port)) {\r
-                sz=htonl(len);\r
-                if (block==0) {\r
-#ifdef __linux__\r
-                    gs_int32_t datainbuffer;\r
-                    if (ioctl(connectionhash[x].socket,SIOCOUTQ,&datainbuffer)<1) {\r
-                        gslog(LOG_EMERG,\r
-                              "GSCMSGQ::error::could not determin free "\r
-                              "space in write buffer errno %u\n",errno);\r
-                        return -1;\r
-                    }\r
-                    if ((SOCK_BUF_SZ-datainbuffer) < (len+sizeof(gs_uint32_t))) {\r
-                        return 1;\r
-                    }\r
-                    \r
-#else\r
-                    // low water mark in setsockoption is supported\r
-                    fd_set fs;\r
-                    gs_int32_t n;\r
-                    struct timeval tv;\r
-                    /* since we set the SNDLOWAT to MAXSZ+4 we know that if the write\r
-                     * select call returns with a 1 for that file descriptor at least that\r
-                     * much memory is available in the send buffer and we therefore\r
-                     * won't block sending\r
-                     */\r
-                    FD_ZERO(&fs);\r
-                    FD_SET(connectionhash[x].socket,&fs);\r
-                    n=connectionhash[x].socket+1;\r
-                    tv.tv_sec = 0;\r
-                    tv.tv_usec = 0;\r
-                    if(select(n,0,&fs,0,&tv)!=1) {\r
-                        return 1;\r
-                    }\r
-#endif\r
-                }\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"\twriting %u",ntohl(sz));\r
-#endif\r
-                ret = writeall(connectionhash[x].socket,&sz,sizeof(gs_uint32_t));\r
-                if (!ret)\r
-                    return 1;\r
-                else if (ret != sizeof(gs_uint32_t)) {\r
-                    gslog(LOG_EMERG,"GSCMSGQ::error::could not write length\n");\r
-                    return -1;\r
-                }\r
-                ret = writeall(connectionhash[x].socket,buf,len);\r
-                if (!ret)\r
-                    return 1;\r
-                else if (ret != len) {\r
-                    gslog(LOG_EMERG,"GSCMSGQ::error::could not write message\n");\r
-                    return -1;\r
-                }\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"...done\n");\r
-#endif\r
-                return 0;\r
-            }\r
-        } else {\r
-            if (u==-1) { u=x; }\r
-        }\r
-    }\r
-    /* ok we don't have a connection make one */\r
-    if ((u>=SOCKET_HASH_SZ) || (u<0)) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"\r
-              " TCP connection limit sending %d\n",u);\r
-        return -1;\r
-    }\r
-    connectionhash[u].remoteid=id;\r
-    connectionhash[u].remoteid.index=0;\r
-    connectionhash[u].remoteid.streamid=0;\r
-    sin.sin_family = AF_INET;\r
-#ifdef __NetBSD__\r
-    sin.sin_len = sizeof(sin);\r
-#endif\r
-    sin.sin_addr.s_addr = htonl(id.ip);\r
-    sin.sin_port = htons(id.port);\r
-    if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n");\r
-        return -1;\r
-    }\r
-    if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n");\r
-        return -1;\r
-    }\r
-    sz=SOCK_BUF_SZ;\r
-    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF,\r
-                   (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n");\r
-        return -1;\r
-    }\r
-    sz=SOCK_BUF_SZ;\r
-    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF,\r
-                   (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n");\r
-        return -1;\r
-    }\r
-#ifndef __linux__\r
-    // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead\r
-    sz=MAXSZ+4;\r
-    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT,\r
-                   (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno);\r
-        return -1;\r
-    }\r
-#endif\r
-    sz=1;\r
-    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE,\r
-                   (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
-        gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n");\r
-        return -1;\r
-    }\r
-    connectionhash[u].used=1;\r
-    goto try_send_again;\r
-    return -1;\r
-}\r
-\r
-static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) {\r
-    gs_int32_t rv;\r
-    gs_sp_t  buf = (gs_sp_t )b;\r
-    gs_int32_t res=sz;\r
-    while(sz>0) {\r
-        if ((rv=read(socket,buf,sz))<0) {\r
-            if (errno == EINTR)\r
-                continue;\r
-            gslog(LOG_EMERG,"read with error number %u \n",errno);\r
-            return -1;\r
-        }\r
-        if (rv==0) {\r
-            return 0;\r
-        }\r
-        sz-=rv;\r
-        buf+=rv;\r
-    }\r
-    return res;\r
-}\r
-\r
-/* msg_recv return len if data is available -1 for a timeout and -2 for an error */\r
-static gs_retval_t msg_recv(gs_sp_t  buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) {\r
-    struct sockaddr_in sin;\r
-    socklen_t sl;\r
-    fd_set fs;\r
-    gs_int32_t x,y;\r
-    gs_int32_t n;\r
-    gs_int32_t length;\r
-    struct timeval tv;\r
-    gs_int32_t sret;\r
-    static gs_int32_t last=0;\r
-    gs_uint32_t sz;\r
-    \r
-    if (check_sideque==1) {\r
-        if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) {\r
-            return length;\r
-        }\r
-        if (x==-2) {\r
-            gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n");\r
-            return -2;\r
-        }\r
-    }\r
-    \r
-read_again:\r
-    if (block==0) {\r
-        tv.tv_sec = 0;\r
-        tv.tv_usec = 0;\r
-    } else {\r
-        tv.tv_sec = 0;\r
-        tv.tv_usec = 100000;\r
-    }\r
-    FD_ZERO(&fs);\r
-    FD_SET(listen_socket,&fs);\r
-    n=listen_socket;\r
-    for(x=0;x<SOCKET_HASH_SZ;x++) {\r
-        if (connectionhash[x].used==1) {\r
-            FD_SET(connectionhash[x].socket,&fs);\r
-            if (n<connectionhash[x].socket) {\r
-                n=connectionhash[x].socket;\r
-            }\r
-        }\r
-    }\r
-    n=n+1;\r
-    // now block\r
-    sret=select(n,&fs,0,0,&tv);\r
-    if ((sret<0) && (errno!=EINTR)) {\r
-        gslog(LOG_EMERG,"Select with error %u\n",errno);\r
-        return -2;\r
-    }\r
-    if (sret<=0) {\r
-        return -1;\r
-    }\r
-    if (FD_ISSET(listen_socket,&fs)) {\r
-        for (x=0;(x<SOCKET_HASH_SZ) && (connectionhash[x].used !=0) ;x++);\r
-        if (x>=SOCKET_HASH_SZ) {\r
-            gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"\r
-                  "TCP connection limit accepting\n");\r
-            goto read_again;\r
-        }\r
-        sl=sizeof(sin);\r
-        if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl))\r
-                   < 0) {\r
-            gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n");\r
-            goto read_again;\r
-        }\r
-        \r
-        sz=SOCK_BUF_SZ;\r
-        if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF,\r
-                       (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
-            fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n");\r
-            return -1;\r
-        }\r
-        sz=SOCK_BUF_SZ;\r
-        if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF,\r
-                       (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
-            fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n");\r
-            return -1;\r
-        }\r
-        \r
-        sl=sizeof(sin);\r
-        if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) {\r
-            gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n");\r
-            close(connectionhash[x].socket);\r
-            goto read_again;\r
-        }\r
-        connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr);\r
-        connectionhash[x].remoteid.port=ntohs(sin.sin_port);\r
-        connectionhash[x].remoteid.index=0;\r
-        connectionhash[x].remoteid.streamid=0;\r
-        connectionhash[x].used=1;\r
-#ifdef PRINTMSG\r
-        gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port);\r
-#endif\r
-        goto read_again;\r
-    }\r
-    for(x=0;x<SOCKET_HASH_SZ;x++) {\r
-               last=(last+1)%SOCKET_HASH_SZ;\r
-        if ((connectionhash[last].used==1) && (FD_ISSET(connectionhash[last].socket,&fs))){\r
-            gs_int32_t rsz;\r
-#ifdef PRINTMSG\r
-            gslog(LOG_EMERG,"reading sret:%d block:%u...",sret,block);\r
-#endif\r
-            if ((rsz=readall(connectionhash[last].socket,&length,sizeof(gs_uint32_t)))!=sizeof(gs_uint32_t)) {\r
-                closeconnection(last);\r
-                gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing %u res %d\n",\r
-                      connectionhash[last].remoteid.port,rsz);\r
-                continue;\r
-            }\r
-            length=ntohl(length);\r
-            if (buflen<length) {\r
-                gs_int8_t d;\r
-                gslog(LOG_EMERG,"GSCMSGQ::error::message to long (%u) for receive buffer (%u)\n",length,\r
-                      buflen);\r
-                /* remove the data */\r
-                for(y=0;y<length;y++)\r
-                    if (readall(connectionhash[last].socket,&d,1)!=1) {\r
-                        closeconnection(last);\r
-                        gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing for receive buffer mismatch\n");\r
-                        return -2;\r
-                    }\r
-                return -2;\r
-            }\r
-            if (readall(connectionhash[last].socket,buf,length)!=length) {\r
-                closeconnection(last);\r
-                gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing on data read\n");\r
-                continue;\r
-            }\r
-#ifdef PRINTMSG\r
-            gslog(LOG_EMERG,"reading done\n");\r
-#endif\r
-            return length;\r
-        }\r
-    }\r
-    return -1;\r
-}\r
-\r
-static gs_retval_t send_nack(FTAID recid) {\r
-    struct internal_message i;\r
-    i.im.receiver = recid;\r
-    i.im.sender = myid;\r
-    i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-    i.im.size = sizeof(struct internal_message);\r
-    i.lowlevelop = LOWLEVELOP_NACK;\r
-    if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {\r
-        return 0;\r
-    } else {\r
-        return -1;\r
-    }\r
-    return -1;\r
-}\r
-\r
-static gs_retval_t send_ack(FTAID recid) {\r
-    struct internal_message i;\r
-    i.im.receiver = recid;\r
-    i.im.sender = myid;\r
-    i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-    i.im.size = sizeof(struct internal_message);\r
-    i.lowlevelop = LOWLEVELOP_ACK;\r
-    if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {\r
-        return 0;\r
-    } else {\r
-        return -1;\r
-    }\r
-    return -1;\r
-}\r
-\r
-static gs_retval_t wait_for_lowlevel_ack() {\r
-    gs_int8_t b[MAXMSGSZ];\r
-    struct internal_message * i;\r
-    gs_int32_t res;\r
-    i=(struct internal_message *)b;\r
-    \r
-    /* this is bussy waiting if there is another\r
-     message waiting to be processed so make sure it is only\r
-     used where bussy waiting is OK. If that becomes\r
-     a probelm add a local queue\r
-     */\r
-    \r
-    while( 1==1) {\r
-        if ((res=msg_recv(b, MAXMSGSZ,1,0))>0) {\r
-            if ((i->im.operation == RESERVED_FOR_LOW_LEVEL)\r
-                && (( i->lowlevelop ==  LOWLEVELOP_ACK)\r
-                    || ( i->lowlevelop ==  LOWLEVELOP_NACK))) {\r
-                    if ( i->lowlevelop ==  LOWLEVELOP_ACK) {\r
-                        return 0;\r
-                    } else {\r
-                        return 1;\r
-                    }\r
-                }\r
-                       if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) {\r
-                               struct internal_remote_tuple * it;\r
-                               struct shmlistentry * s;\r
-                               it=(struct internal_remote_tuple *)b;\r
-                \r
-                               if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {\r
-#ifdef PRINTMSG\r
-                                       gslog(LOG_EMERG,"Received remote ringbuf message "\r
-                          "for message of size %u\n",\r
-                          it->size,it->im.size);\r
-#endif\r
-#ifdef BLOCKRINGBUFFER\r
-                                       while (SPACETOWRITE(s->buf)==0) {\r
-                                               usleep(1000);\r
-                                               gslog(LOG_ERR,"Dead in the water we can't "\r
-                              "drain the ringbuffer we wait for.");\r
-                                       }\r
-                                       memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
-                                       ADVANCEWRITE(s->buf);\r
-#else\r
-                                       if (SPACETOWRITE(s->buf)) {\r
-                                               memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
-                                               ADVANCEWRITE(s->buf);\r
-                                       } else {\r
-                        //                                             gslog(LOG_EMERG,"r+");\r
-                                       }\r
-#endif\r
-                               } else {\r
-                                       gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");\r
-                               }\r
-                       } else {\r
-                               gscpipc_sidequeue_append(b, res);\r
-                       }\r
-        }  else {\r
-            if (res < -1) {\r
-                /* got an error here */\r
-                gslog(LOG_EMERG,"hostlib::error::received error "\r
-                      "during wait for low level ack\n");\r
-                return -1;\r
-            }\r
-        }\r
-    }\r
-    /* never reached */\r
-    return -1;\r
-}\r
-\r
-void shmlist_drain_remote()\r
-{\r
-    gs_int32_t x;\r
-    gs_int8_t buf[MAXSZ];\r
-    struct internal_remote_tuple * it;\r
-    it = (struct internal_remote_tuple *) buf;\r
-    for (x=0; x<shmlistlen; x++) {\r
-        if ((shmlist[x].msgid.ip != myid.ip)&& (shmlist[x].type==SHM_SEND)) {\r
-            while (UNREAD(shmlist[x].buf)) {\r
-                it->im.receiver = shmlist[x].msgid;\r
-                it->im.sender = myid;\r
-                it->im.operation = RESERVED_FOR_LOW_LEVEL;\r
-                it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE;\r
-                it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1;\r
-                it->im.size = sizeof(struct internal_remote_tuple)-4+it->size;\r
-                memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size);\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n",\r
-                      it->size, it->im.size);\r
-#endif\r
-                if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) {\r
-                    break;\r
-                }\r
-                ADVANCEREAD(shmlist[x].buf);\r
-            }\r
-        }\r
-    }\r
-}\r
-\r
-/*\r
- *used to contact the clearinghouse process returns the MSGID of\r
- * the current process\r
- */\r
-gs_retval_t gscpipc_init(gs_int32_t clearinghouse)\r
-{\r
-    struct internal_message i;\r
-    key_t msqtoken=0;\r
-    gs_int32_t x;\r
-    endpoint gshub;\r
-    endpoint tmpclearinghouse;\r
-    \r
-    /* make sure priveleges can be set */\r
-    umask(0);\r
-    \r
-    clearinghouseftaid.index=0;\r
-    clearinghouseftaid.streamid=0;\r
-    \r
-    if (get_hub(&gshub)!=0) {\r
-        gslog(LOG_EMERG,"hostlib::error::could get hub\n");\r
-        return -1;\r
-    }\r
-    \r
-    if (clearinghouse!=0) {\r
-        // This is the clearinghouse\r
-        gs_int8_t buf[MAXMSGSZ];\r
-        \r
-        if (msg_init(1)<0) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");\r
-            return -1;\r
-        }\r
-        \r
-        clearinghouseftaid.ip=myid.ip;\r
-        clearinghouseftaid.port=myid.port;\r
-        \r
-        tmpclearinghouse.ip=htonl(clearinghouseftaid.ip);\r
-        tmpclearinghouse.port=htons(clearinghouseftaid.port);\r
-        \r
-        if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) {\r
-            gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance");\r
-            return -1;\r
-        }\r
-        \r
-        return 0;\r
-        \r
-    } else {\r
-        // This is an lfta/hfta/app\r
-        gs_int32_t res;\r
-        \r
-        if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n");\r
-            return -1;\r
-        }\r
-        \r
-        clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip);\r
-        clearinghouseftaid.port=ntohs(tmpclearinghouse.port);\r
-        \r
-        \r
-        if (msg_init(0)<0) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");\r
-            return -1;\r
-        }\r
-        \r
-        i.im.receiver = clearinghouseftaid;\r
-        i.im.sender = myid;\r
-        i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-        i.im.size = sizeof(struct internal_message);\r
-        i.lowlevelop = LOWLEVELOP_REGISTER;\r
-#ifdef PRINTMSG\r
-        i.im.pmsgid=pmsgid;\r
-        pmsgid++;\r
-        gslog(LOG_EMERG,"send a message (%d.%u) to %u with op "\r
-              "%u with size %u\n",\r
-              i.im.pmsgid,i.im.receiver.port,i.im.operation,\r
-              i.im.size);\r
-#endif\r
-        if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) {\r
-            /* we can wait her for an ack since nobody should know\r
-             about us yet */\r
-        init_read_again:\r
-            if ((res=msg_recv((gs_sp_t )&i,\r
-                              sizeof(struct internal_message),\r
-                              1,0))>=0) {\r
-                if (i.lowlevelop == LOWLEVELOP_ACK) {\r
-                    return 0;\r
-                } else {\r
-                    gslog(LOG_EMERG,"hostlib::error::received unexpected message "\r
-                          "during initalization\n");\r
-                    return -1;\r
-                }\r
-            } else {\r
-                if (res<-1) {\r
-                    /* got an error here */\r
-                    gslog(LOG_EMERG,"hostlib::error::received error message "\r
-                          "during initalization\n");\r
-                    return -1;\r
-                } else {\r
-                    goto init_read_again;\r
-                }\r
-            }\r
-        }\r
-        gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n");\r
-        return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-\r
-static gs_retval_t gscpdetachshm(FTAID target)\r
-{\r
-    struct internal_message i;\r
-    \r
-    i.im.receiver = target;\r
-    i.im.sender = myid;\r
-    i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-    i.im.size = sizeof(struct internal_message);\r
-    i.lowlevelop = LOWLEVELOP_SHM_FREE;\r
-    if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) {\r
-        /* no reason to wait here won't be acked anyway */\r
-        return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-static gs_retval_t gscpdetachsocket(FTAID target)\r
-{\r
-    struct internal_message i;\r
-    \r
-    i.im.receiver = target;\r
-    i.im.sender = myid;\r
-    i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-    i.im.size = sizeof(struct internal_message);\r
-    i.lowlevelop = LOWLEVELOP_UNREGISTER;\r
-    if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) {\r
-        /* no reason to wait here won't be acked anyway */\r
-        return -1;\r
-    }\r
-    return -1;\r
-}\r
-\r
-\r
-/* used to disassociate process from clearinghouse */\r
-gs_retval_t gscpipc_free()\r
-{\r
-    gs_int32_t x;\r
-    /* XXX OS if this function is called when there are still\r
-     subscribed FTAs for this process the clearinghouse will\r
-     crash\r
-     */\r
-    \r
-    if (clearinghouse!=0) {\r
-        return 0;\r
-    } else {\r
-        gs_int32_t x;\r
-        for (x=0; x<shmlistlen; x++) {\r
-            if (shmlist[x].type==SHM_RECV) {\r
-                gscpdetachshm(shmlist[x].msgid);\r
-                if (shmdt((gs_sp_t )shmlist[x].buf)!=0) {\r
-                    gslog(LOG_EMERG,"hostlib::error::could not "\r
-                          "detach shared memory\n");\r
-                }\r
-            } else {\r
-                gslog(LOG_EMERG,"hostlib::error::porccess freed while still "\r
-                      "attached to sending shared memory\n");\r
-            }\r
-        }\r
-    }\r
-    /* remove connection */\r
-    for(x=0;x<SOCKET_HASH_SZ;x++) {\r
-        if (connectionhash[x].used==1) {\r
-            //XXX detach does not work due to interleaved messages\r
-            // gscpdetachsocket(connectionhash[x].remoteid);\r
-            close(connectionhash[x].socket);\r
-        }\r
-    }\r
-    close(listen_socket);\r
-    return 0;\r
-}\r
-\r
-/* returns MSGID of current process */\r
-FTAID gscpipc_getftaid()\r
-{\r
-    return myid;\r
-}\r
-\r
-\r
-/* sends a message to a process */\r
-gs_retval_t gscpipc_send(FTAID f, gs_int32_t operation, gs_sp_t  buf, gs_int32_t length, gs_int32_t block)\r
-{\r
-    gs_int8_t b[MAXMSGSZ];\r
-    struct ipc_message * i;\r
-    struct shmlistentry * s;\r
-    if (length > MAXMSGSZ) {\r
-        gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n");\r
-        return -1;\r
-    }\r
-    i = (struct ipc_message *) b;\r
-    i->receiver=f;\r
-    i->sender=myid;\r
-    i->operation=operation;\r
-    i->size=length+sizeof(struct ipc_message);\r
-    memcpy(&i->data[0],buf,length);\r
-#ifdef PRINTMSG\r
-    i->pmsgid=pmsgid;\r
-    pmsgid++;\r
-    gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n",\r
-          i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length);\r
-#endif\r
-    if ((s=shmlist_find(f, SHM_RECV))!=0) {\r
-        // set the hint in the ringbuffer that there is something on the shared memory queue\r
-        s->buf->mqhint=1;\r
-    }\r
-    if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) {\r
-        gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno);\r
-        return -1;\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* retrieve a message buf has to be at least of size MAXMSGSZ*/\r
-gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t  buf, gs_int32_t * size, gs_int32_t block)\r
-{\r
-    gs_int32_t w;\r
-    gs_int32_t x;\r
-    gs_int8_t b[MAXSZ];\r
-       gs_int32_t y;\r
-    \r
-    struct internal_message * i;\r
-    struct internal_remote_tuple * it;\r
-    gs_int32_t length;\r
-    \r
-    struct shmlistentry * s;\r
-    \r
-    i=(struct internal_message *)b;\r
-    it=(struct internal_remote_tuple *)b;\r
-    \r
-    for(y=0;(y < 10) || (block==1);y++) {\r
-        shmlist_drain_remote();\r
-               length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1);\r
-               if (length < -1) {\r
-                       /* problem */\r
-                       return -1;\r
-               }\r
-               if (length < 0) {\r
-                       /* we are nonblocking and have nothing to do */\r
-            if (block==1) {\r
-                // we are expected to block for ever if it is 0 or 2 we return\r
-                continue;\r
-            }\r
-                       return 0;\r
-               }\r
-#ifdef PRINTMSG\r
-               gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n",\r
-              i->im.pmsgid,  i->im.sender, i->im.sender,i->im.operation,i->im.size);\r
-#endif\r
-               if (length <0) {\r
-                       gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno);\r
-                       return -1;\r
-               }\r
-               if (i->im.operation != RESERVED_FOR_LOW_LEVEL) {\r
-                       memcpy(buf,&(i->im.data[0]),i->im.size);\r
-                       *size=i->im.size-sizeof(struct ipc_message);\r
-                       *operation=i->im.operation;\r
-                       *f=i->im.sender;\r
-                       if ((s=shmlist_find(*f, SHM_SEND))!=0) {\r
-                               // clear the hint in the ringbuffer to indicate we got the message\r
-                s->buf->mqhint=0;\r
-                       }\r
-                       return 1;\r
-               }\r
-               switch (i->lowlevelop) {\r
-            case LOWLEVELOP_REGISTER:\r
-                /* this should only get called if the process is the clearinghouse */\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port);\r
-#endif\r
-                send_ack(i->im.sender);\r
-                break;\r
-            case LOWLEVELOP_UNREGISTER:\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port);\r
-#endif    /* remove connection */\r
-                for(x=0;x<SOCKET_HASH_SZ;x++) {\r
-                    if ( (connectionhash[x].used==1)\r
-                        && (connectionhash[x].remoteid.ip==i->im.sender.ip)\r
-                        && (connectionhash[x].remoteid.port==i->im.sender.port)) {\r
-                       gslog(LOG_EMERG,"Close by remote request %u\n",\r
-                              connectionhash[x].remoteid.port);\r
-                        // XXX closed when the other process dies\r
-                        // can't close it yet since we might have\r
-                        // some more messages\r
-                        // close(connectionhash[x].socket);\r
-                        connectionhash[x].used=0;\r
-                    }\r
-                }\r
-                break;\r
-            case LOWLEVELOP_SHM_REGISTER:\r
-            {\r
-                gs_int32_t shmid;\r
-                struct ringbuf * r;\r
-                struct shmid_ds sms;\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n",\r
-                      i->im.sender.port,\r
-                      i->shmtoken,i->shmsz);\r
-#endif\r
-                if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) {\r
-                    if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) {\r
-                        gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno);\r
-                        send_nack(i->im.sender);\r
-                    } else {\r
-                        // Make sure all the momory gets mapped now\r
-                                       for(x=0;x<length;x=x+1024) {\r
-                            ((gs_uint8_t *) r)[x]=0;\r
-                                       }\r
-                        \r
-#ifdef PRINTMSG\r
-                        gslog(LOG_EMERG,"Got a ring buffer at address %p (%u %u %u %u)\n"\r
-                              ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);\r
-#endif\r
-                        if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken,\r
-                                        shmid,r,i->shmsz)<0) {\r
-                            shmdt((gs_sp_t )r);\r
-                            shmctl(shmid,IPC_RMID,&sms);\r
-                            gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n");\r
-                            send_nack(i->im.sender);\r
-                        } else {\r
-                            send_ack(i->im.sender);\r
-                        }\r
-                    }\r
-                } else {\r
-                    gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno);\r
-                    send_nack(i->im.sender);\r
-                }\r
-            }\r
-                break;\r
-            case LOWLEVELOP_SHM_REMOTE_REGISTER:\r
-            {\r
-                gs_int32_t shmid;\r
-                struct ringbuf * r;\r
-                struct shmid_ds sms;\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"request to get remote shm %u  size %u\n",\r
-                      i->im.sender.port,i->shmsz);\r
-#endif\r
-                if ((r=(struct ringbuf *)malloc(i->shmsz))==0) {\r
-                    gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno);\r
-                    send_nack(i->im.sender);\r
-                } else {\r
-                    // make sure all the memory gets mapped now\r
-                    for(x=0;x<length;x=x+1024) {\r
-                        ((gs_uint8_t *) r)[x]=0;\r
-                    }\r
-                    r->reader=0;\r
-                    r->writer=0;\r
-                    r->length=i->shmsz;\r
-                    r->end= i->shmsz-MAXTUPLESZ;\r
-                    r->destid=i->im.receiver;\r
-                    r->srcid=i->im.sender;\r
-#ifdef PRINTMSG\r
-                    gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n"\r
-                          ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);\r
-#endif\r
-                    if (shmlist_add(i->im.sender,SHM_SEND,0,\r
-                                    0,r,i->shmsz)<0) {\r
-                        gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n");\r
-                        send_nack(i->im.sender);\r
-                    } else {\r
-                        send_ack(i->im.sender);\r
-                    }\r
-                }\r
-            }\r
-                break;\r
-            case LOWLEVELOP_SHM_FREE:\r
-            {\r
-                struct shmlistentry * sm;\r
-                struct shmid_ds sms;\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);\r
-#endif\r
-                if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {\r
-#ifdef PRINTMSG\r
-                    gslog(LOG_EMERG,"freeing %u",sm->shmid);\r
-#endif\r
-                    shmdt((gs_sp_t )sm->buf);\r
-                    shmctl(sm->shmid,IPC_RMID,&sms);\r
-                    shmlist_rm(i->im.sender,SHM_SEND);\r
-                }\r
-            }\r
-                break;\r
-            case LOWLEVELOP_SHM_REMOTE_FREE:\r
-            {\r
-                struct shmlistentry * sm;\r
-                struct shmid_ds sms;\r
-#ifdef PRINTMSG\r
-                gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);\r
-#endif\r
-                if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {\r
-#ifdef PRINTMSG\r
-                    gslog(LOG_EMERG,"freeing %u",sm->shmid);\r
-#endif\r
-                    free((gs_sp_t ) sm->buf);\r
-                    shmlist_rm(i->im.sender,SHM_SEND);\r
-                }\r
-            }\r
-                break;\r
-            case LOWLEVELOP_REMOTE_TUPLE:\r
-            {\r
-                if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {\r
-#ifdef PRINTMSG\r
-                    gslog(LOG_EMERG,"Received remote ringbuf message "\r
-                          "for message of size %u\n",\r
-                          it->size,it->im.size);\r
-#endif\r
-#ifdef BLOCKRINGBUFFER\r
-                    while (SPACETOWRITE(s->buf)==0) {\r
-                        usleep(1000);\r
-                        gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for.");\r
-                    }\r
-                    memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
-                    ADVANCEWRITE(s->buf);\r
-#else\r
-                    if (SPACETOWRITE(s->buf)) {\r
-                        memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
-                        ADVANCEWRITE(s->buf);\r
-                    } else {\r
-                        //                   gslog(LOG_EMERG,"r+");\r
-                    }\r
-#endif\r
-                } else {\r
-                    gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");\r
-                }\r
-            }\r
-                break;\r
-            default:\r
-                gslog(LOG_EMERG,"hostlib::error::unexpected message received\n");\r
-                return -1;\r
-               }\r
-    }\r
-    return 0;\r
-}\r
-\r
-/* allocate a ringbuffer which allows receiving data from\r
- * the other process. returns 0 if didn't succeed and\r
- * returns an existing buffer if it exists */\r
-struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length)\r
-{\r
-    struct shmid_ds sms;\r
-    struct shmlistentry * se;\r
-    gs_int8_t keybuf[1024];\r
-    key_t shmtoken=0;\r
-    gs_int32_t shmid=0;\r
-    struct ringbuf * r;\r
-    struct internal_message i;\r
-    gs_int32_t x;\r
-    \r
-    se = shmlist_find(f, SHM_RECV);\r
-    if (se) {\r
-        return se->buf;\r
-    }\r
-    \r
-    if (length<(4*MAXTUPLESZ)) {\r
-        gslog(LOG_EMERG,\r
-              "ERROR:buffersize in gscpipc_createshm  has to be "\r
-              "at least %u Bytes long\n",\r
-              4*MAXTUPLESZ);\r
-        return 0;\r
-    }\r
-    \r
-    if (myid.ip == f.ip) {\r
-        sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port);\r
-        if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1)  {\r
-            gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n");\r
-            return 0;\r
-        }\r
-        close(x);\r
-        \r
-        if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n");\r
-            return 0;\r
-        }\r
-        \r
-        if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL|\r
-                                        IPC_CREAT|IPC_EXCL))==-1) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno);\r
-            return 0;\r
-        }\r
-        if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n");\r
-            return 0;\r
-        }\r
-        /* touch all memory once to map/reserve it now */\r
-        for(x=0;x<length;x=x+1024) {\r
-            ((gs_uint8_t *) r)[x]=0;\r
-        }\r
-        r->reader=0;\r
-        r->writer=0;\r
-        r->length=length;\r
-        r->end= r->length-MAXTUPLESZ;\r
-        r->destid=f;\r
-        r->srcid=myid;\r
-        i.im.receiver = f;\r
-        i.im.sender = myid;\r
-        i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-        i.im.size = sizeof(struct internal_message);\r
-        i.lowlevelop = LOWLEVELOP_SHM_REGISTER;\r
-        i.shmtoken = shmtoken;\r
-        i.shmsz=length;\r
-        if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {\r
-            if (wait_for_lowlevel_ack()<0) {\r
-                shmdt((gs_sp_t )r);\r
-                shmctl(shmid,IPC_RMID,&sms);\r
-                return 0;\r
-            }\r
-        }\r
-        shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */  \r
-        if (shmlist_add(f, SHM_RECV, shmtoken, \r
-                        shmid, r, length) <0) {\r
-            shmdt((gs_sp_t )r);\r
-            shmctl(shmid,IPC_RMID,&sms);\r
-            i.im.receiver = f;\r
-            i.im.sender = myid;\r
-            i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-            i.im.size = sizeof(struct internal_message);\r
-            i.lowlevelop = LOWLEVELOP_SHM_FREE;\r
-            i.shmtoken = shmtoken;\r
-            msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
-            return 0;\r
-        }\r
-    } else {\r
-        /* remote shared memory */\r
-        if ((r=(struct ringbuf *)malloc(length))==0) {\r
-            gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n");\r
-            return 0;\r
-        }\r
-        /* touch all memory once to map/reserve it now */\r
-        for(x=0;x<length;x=x+1024) {\r
-            ((gs_uint8_t *) r)[x]=0;\r
-        }\r
-        r->reader=0;\r
-        r->writer=0;\r
-        r->length=length;\r
-        r->end= r->length-MAXTUPLESZ;\r
-        r->destid=f;\r
-        r->srcid=myid;\r
-        i.im.receiver = f;\r
-        i.im.sender = myid;\r
-        i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-        i.im.size = sizeof(struct internal_message);\r
-        i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER;\r
-        i.shmtoken = shmtoken;\r
-        i.shmsz=length;\r
-        if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {\r
-            if (wait_for_lowlevel_ack()<0) {\r
-                free(r);\r
-                return 0;\r
-            }\r
-        }\r
-        if (shmlist_add(f, SHM_RECV, 0, \r
-                        0, r, length) <0) {\r
-            free(r);\r
-            i.im.receiver = f;\r
-            i.im.sender = myid;\r
-            i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-            i.im.size = sizeof(struct internal_message);\r
-            i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;\r
-            msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
-            return 0;\r
-        }\r
-    }\r
-    return r;\r
-       \r
-}\r
-\r
-/* finds a ringbuffer to send which was allocated by\r
- * gscpipc_creatshm and return 0 on an error */\r
-\r
-struct ringbuf * gscpipc_getshm(FTAID f) \r
-{\r
-    struct shmlistentry * se;\r
-    gs_int32_t recmsgid;\r
-    se = shmlist_find(f, SHM_SEND);\r
-    if (se) {\r
-        return se->buf;\r
-    }\r
-    return 0;\r
-}     \r
-\r
-/* frees shared memory to a particular proccess identified\r
- * by MSGID\r
- */ \r
-gs_retval_t gscpipc_freeshm(FTAID f)\r
-{\r
-    struct internal_message i;\r
-    struct shmlistentry * sm;\r
-    struct shmid_ds sms;\r
-    if (myid.ip == f.ip) { \r
-        if ((sm=shmlist_find(f, SHM_RECV)) <0) {\r
-            shmdt((gs_sp_t )sm->buf);\r
-            shmctl(sm->shmid,IPC_RMID,&sms);\r
-            i.im.receiver = f;\r
-            i.im.sender = myid;\r
-            i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-            i.im.size = sizeof(struct internal_message);\r
-            i.lowlevelop = LOWLEVELOP_SHM_FREE;\r
-            i.shmtoken = sm->shmtoken;\r
-            msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
-            shmlist_rm(f, SHM_RECV);\r
-        }\r
-    } else {\r
-        if ((sm=shmlist_find(f, SHM_RECV)) <0) {\r
-            free((gs_sp_t )sm->buf);\r
-            i.im.receiver = f;\r
-            i.im.sender = myid;\r
-            i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
-            i.im.size = sizeof(struct internal_message);\r
-            i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;\r
-            msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
-            shmlist_rm(f, SHM_RECV);\r
-        }\r
-    }    \r
-    return 0;\r
-}\r
-\r
-gs_retval_t gscpipc_mqhint()\r
-{\r
-    gs_int32_t x;\r
-    for (x=0; x<shmlistlen; x++) {\r
-        if (shmlist[x].type == SHM_SEND) {\r
-            if (shmlist[x].buf->mqhint) {\r
-                return 1;\r
-            }\r
-        }\r
-    }\r
-    return 0;\r
-}\r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "gscpipc.h"
+#include "gshub.h"
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#ifdef __linux__
+#include <sys/ioctl.h>
+#include <netinet/tcp.h>
+#include <linux/sockios.h>
+#endif
+
+#ifndef socklen_t
+#define socklen_t gs_uint32_t
+#endif
+
+
+struct FTAID clearinghouseftaid;
+
+
+
+struct connection {
+    gs_int32_t socket; /* socket for connection */
+    gs_int32_t used;   /* 1 if the entry is in use */
+    FTAID remoteid; /* remoteid of connection */
+};
+
+struct connection connectionhash[SOCKET_HASH_SZ];
+
+gs_int32_t clearinghouse=0;
+
+#define SHMTYPE 's'
+#define SHM_RECV 1
+#define SHM_SEND 2
+
+struct shmlistentry {
+    FTAID msgid;
+    gs_int32_t type;
+    key_t shmtoken;
+    gs_int32_t shmid;
+    struct ringbuf * buf;
+    gs_int32_t buffsize;
+};
+
+gs_int32_t shmlistlen=0;
+struct shmlistentry shmlist[MAX_NUMBER_OF_SHM];
+
+struct sq {
+    gs_int8_t buf[MAXMSGSZ];
+    gs_int32_t length;
+    struct sq * next;
+};
+
+static struct sq * sqtop=0;
+static struct sq * sqtail=0;
+
+/* adds a buffer to the end of the sidequeue*/
+gs_retval_t gscpipc_sidequeue_append(gs_sp_t  buf, gs_int32_t length)
+{
+    struct sq * s;
+    if ((s=malloc(sizeof(struct sq)))==0) {
+        gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
+        return -1;
+    }
+    memcpy(&s->buf[0],buf,MAXMSGSZ);
+    s->length=length;
+    s->next=0;
+    if (sqtail) {
+        sqtail->next=s;
+        sqtail=s;
+    } else {
+        sqtop = s;
+        sqtail = s;
+    }
+    return 0;
+}
+
+/* removes a buffer from the top of the sidequeue*/
+gs_retval_t gscpipc_sidequeue_pop(gs_sp_t  buf, gs_int32_t * length, gs_int32_t buflen)
+{
+    struct sq * s;
+    if (sqtop) {
+        if (sqtop->length > buflen) {
+            return -2;
+        }
+        memcpy(buf,&sqtop->buf[0],sqtop->length);
+        *length=sqtop->length;
+        s=sqtop;
+        sqtop=sqtop->next;
+        if (sqtop==0) sqtail=0;
+        free(s);
+        return 0;
+    }
+    return -1;
+}
+
+struct ipc_message {
+    FTAID receiver;
+    FTAID sender;
+    gs_int32_t operation;
+#ifdef PRINTMSG
+    gs_int32_t pmsgid;
+#endif
+    gs_int32_t  size;
+    gs_int8_t data[4];
+};
+
+#define LOWLEVELOP_ACK 0
+#define LOWLEVELOP_NACK 1
+#define LOWLEVELOP_REGISTER 2
+#define LOWLEVELOP_UNREGISTER 3
+#define LOWLEVELOP_SHM_REGISTER 4
+#define LOWLEVELOP_SHM_FREE 5
+#define LOWLEVELOP_SHM_REMOTE_REGISTER 6
+#define LOWLEVELOP_SHM_REMOTE_FREE 7
+#define LOWLEVELOP_REMOTE_TUPLE 8
+
+struct internal_message{
+    struct ipc_message im;
+    int lowlevelop;
+    key_t shmtoken;
+    int shmsz;
+    int result;
+};
+
+struct internal_remote_tuple{
+    struct ipc_message im;
+    int lowlevelop;
+    int size;
+    gs_int8_t data[4];
+};
+
+FTAID myid;
+int listen_socket;
+
+#ifdef PRINTMSG
+int pmsgid=0;
+#endif
+
+
+struct shmlistentry * shmlist_find(FTAID msgid, int type)
+{
+    int x;
+    for (x=0; x<shmlistlen; x++) {
+        if ((shmlist[x].msgid.ip == msgid.ip)
+            && (shmlist[x].msgid.port == msgid.port)
+            && (shmlist[x].type == type)) {
+            return &(shmlist[x]);
+        }
+    }
+    return 0;
+}
+
+gs_retval_t shmlist_add(FTAID msgid, int type, key_t shmtoken,
+                        int shmid, struct ringbuf * buf, int buffsize)
+{
+    if (shmlist_find(msgid, type) !=0) {
+        return -1;
+    }
+    if (shmlistlen>=MAX_NUMBER_OF_SHM) {
+        gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many"
+              "shm registered\n");
+        return -1;
+    }
+    shmlist[shmlistlen].msgid=msgid;
+    shmlist[shmlistlen].type=type;
+    shmlist[shmlistlen].shmtoken=shmtoken;
+    shmlist[shmlistlen].shmid=shmid;
+    shmlist[shmlistlen].buf=buf;
+    shmlist[shmlistlen].buffsize=buffsize;
+    shmlistlen++;
+    return 0;
+}
+
+static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type)
+{
+    gs_int32_t x;
+    gs_int32_t move=0;
+    for (x=0;x<shmlistlen;x++) {
+        if ((shmlist[x].msgid.ip == msgid.ip)
+            && (shmlist[x].msgid.port == msgid.port)
+            &&(shmlist[x].type==type)) {
+            shmlistlen--;
+            move=1;
+        }
+        if ((move==1)&&(x<shmlistlen)) {
+            shmlist[x]=shmlist[x+1];
+        }
+    }
+    return 0;
+}
+
+
+/* starts the listen socket for the current proccess
+ if the listen_port is 0 then a random port is assigned
+ this function also sets myid
+ */
+static gs_retval_t msg_init(gs_uint32_t clearinghouse) {
+    struct sockaddr_in sin;
+    gs_int32_t x;
+    FILE *f;
+    socklen_t sin_sz;
+    
+    /* mark all entries in the connection hash as unused */
+    for(x=0;x<SOCKET_HASH_SZ;x++) {
+        connectionhash[x].used=0;
+    }
+    
+    myid.index=0;
+    myid.streamid=0;
+    
+    bzero(&sin, sizeof(sin));
+    sin.sin_family = AF_INET;
+#ifdef __NetBSD__
+    sin.sin_len = sizeof(sin);
+#endif
+    sin.sin_addr.s_addr = 0;
+    sin.sin_port = 0;
+    
+    if ((listen_socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not create listen socket\n");
+        return -1;
+    }
+    
+    if (bind(listen_socket, (struct sockaddr *) &sin, sizeof(sin)) < 0 ) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not bind to socket for ip %x port %u with error %u \n",
+              ntohl(sin.sin_addr.s_addr), ntohs(sin.sin_port),errno);
+        return -1;
+    }
+    
+    if (listen(listen_socket, 64) < 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not listen to socket for port %u \n",ntohs(sin.sin_port));
+        close(listen_socket);
+        return -1;
+    }
+    sin_sz=sizeof(sin);
+    if (getsockname(listen_socket, (struct sockaddr *) &sin, &sin_sz) < 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not get local port number of listen socket\n");
+        return -1;
+    }
+    
+    myid.port=ntohs(sin.sin_port);
+    myid.ip=ntohl(sin.sin_addr.s_addr);
+    
+    return 0;
+}
+
+static void closeconnection(gs_int32_t x) {
+    if (connectionhash[x].used==1)  {
+        close(connectionhash[x].socket);
+        connectionhash[x].used=0;
+    }
+}
+
+static gs_retval_t writeall(gs_int32_t socket, void * b, gs_int32_t sz) {
+    gs_int32_t rv;
+    gs_sp_t  buf = (gs_sp_t )b;
+    gs_int32_t res=sz;
+    while(sz>0) {
+        if ((rv=write(socket,buf,sz))<0) {
+            if (errno == EINTR)
+                continue;
+            else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS
+                return 0;
+            else
+                return -1;
+        }
+        sz-=rv;
+        buf+=rv;
+    }
+    return res;
+}
+
+static gs_retval_t msg_send(FTAID id, gs_sp_t  buf, gs_uint32_t len, gs_uint32_t block) {
+    struct sockaddr_in sin;
+    gs_int32_t x;
+    gs_int32_t u;
+    gs_int32_t sz;
+    gs_int32_t ret;
+    
+try_send_again:
+    u=-1;
+    for(x=0;x<SOCKET_HASH_SZ;x++) {
+        if (connectionhash[x].used==1) {
+            if ((connectionhash[x].remoteid.ip==id.ip)
+                && (connectionhash[x].remoteid.port==id.port)) {
+                sz=htonl(len);
+                if (block==0) {
+#ifdef __linux__
+                    gs_int32_t datainbuffer;
+                    if (ioctl(connectionhash[x].socket,SIOCOUTQ,&datainbuffer)<1) {
+                        gslog(LOG_EMERG,
+                              "GSCMSGQ::error::could not determin free "
+                              "space in write buffer errno %u\n",errno);
+                        return -1;
+                    }
+                    if ((SOCK_BUF_SZ-datainbuffer) < (len+sizeof(gs_uint32_t))) {
+                        return 1;
+                    }
+                    
+#else
+                    // low water mark in setsockoption is supported
+                    fd_set fs;
+                    gs_int32_t n;
+                    struct timeval tv;
+                    /* since we set the SNDLOWAT to MAXSZ+4 we know that if the write
+                     * select call returns with a 1 for that file descriptor at least that
+                     * much memory is available in the send buffer and we therefore
+                     * won't block sending
+                     */
+                    FD_ZERO(&fs);
+                    FD_SET(connectionhash[x].socket,&fs);
+                    n=connectionhash[x].socket+1;
+                    tv.tv_sec = 0;
+                    tv.tv_usec = 0;
+                    if(select(n,0,&fs,0,&tv)!=1) {
+                        return 1;
+                    }
+#endif
+                }
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"\twriting %u",ntohl(sz));
+#endif
+                ret = writeall(connectionhash[x].socket,&sz,sizeof(gs_uint32_t));
+                if (!ret)
+                    return 1;
+                else if (ret != sizeof(gs_uint32_t)) {
+                    gslog(LOG_EMERG,"GSCMSGQ::error::could not write length\n");
+                    return -1;
+                }
+                ret = writeall(connectionhash[x].socket,buf,len);
+                if (!ret)
+                    return 1;
+                else if (ret != len) {
+                    gslog(LOG_EMERG,"GSCMSGQ::error::could not write message\n");
+                    return -1;
+                }
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"...done\n");
+#endif
+                return 0;
+            }
+        } else {
+            if (u==-1) { u=x; }
+        }
+    }
+    /* ok we don't have a connection make one */
+    if ((u>=SOCKET_HASH_SZ) || (u<0)) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"
+              " TCP connection limit sending %d\n",u);
+        return -1;
+    }
+    connectionhash[u].remoteid=id;
+    connectionhash[u].remoteid.index=0;
+    connectionhash[u].remoteid.streamid=0;
+    sin.sin_family = AF_INET;
+#ifdef __NetBSD__
+    sin.sin_len = sizeof(sin);
+#endif
+    sin.sin_addr.s_addr = htonl(id.ip);
+    sin.sin_port = htons(id.port);
+    if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n");
+        return -1;
+    }
+    if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n");
+        return -1;
+    }
+    sz=SOCK_BUF_SZ;
+    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF,
+                   (gs_sp_t )&sz, sizeof(sz)) != 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n");
+        return -1;
+    }
+    sz=SOCK_BUF_SZ;
+    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF,
+                   (gs_sp_t )&sz, sizeof(sz)) != 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n");
+        return -1;
+    }
+#ifndef __linux__
+    // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead
+    sz=MAXSZ+4;
+    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT,
+                   (gs_sp_t )&sz, sizeof(sz)) != 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno);
+        return -1;
+    }
+#endif
+    sz=1;
+    if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE,
+                   (gs_sp_t )&sz, sizeof(sz)) != 0) {
+        gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n");
+        return -1;
+    }
+    connectionhash[u].used=1;
+    goto try_send_again;
+    return -1;
+}
+
+static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) {
+    gs_int32_t rv;
+    gs_sp_t  buf = (gs_sp_t )b;
+    gs_int32_t res=sz;
+    while(sz>0) {
+        if ((rv=read(socket,buf,sz))<0) {
+            if (errno == EINTR)
+                continue;
+            gslog(LOG_EMERG,"read with error number %u \n",errno);
+            return -1;
+        }
+        if (rv==0) {
+            return 0;
+        }
+        sz-=rv;
+        buf+=rv;
+    }
+    return res;
+}
+
+/* msg_recv return len if data is available -1 for a timeout and -2 for an error */
+static gs_retval_t msg_recv(gs_sp_t  buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) {
+    struct sockaddr_in sin;
+    socklen_t sl;
+    fd_set fs;
+    gs_int32_t x,y;
+    gs_int32_t n;
+    gs_int32_t length;
+    struct timeval tv;
+    gs_int32_t sret;
+    static gs_int32_t last=0;
+    gs_uint32_t sz;
+    
+    if (check_sideque==1) {
+        if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) {
+            return length;
+        }
+        if (x==-2) {
+            gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n");
+            return -2;
+        }
+    }
+    
+read_again:
+    if (block==0) {
+        tv.tv_sec = 0;
+        tv.tv_usec = 0;
+    } else {
+        tv.tv_sec = 0;
+        tv.tv_usec = 100000;
+    }
+    FD_ZERO(&fs);
+    FD_SET(listen_socket,&fs);
+    n=listen_socket;
+    for(x=0;x<SOCKET_HASH_SZ;x++) {
+        if (connectionhash[x].used==1) {
+            FD_SET(connectionhash[x].socket,&fs);
+            if (n<connectionhash[x].socket) {
+                n=connectionhash[x].socket;
+            }
+        }
+    }
+    n=n+1;
+    // now block
+    sret=select(n,&fs,0,0,&tv);
+    if ((sret<0) && (errno!=EINTR)) {
+        gslog(LOG_EMERG,"Select with error %u\n",errno);
+        return -2;
+    }
+    if (sret<=0) {
+        return -1;
+    }
+    if (FD_ISSET(listen_socket,&fs)) {
+        for (x=0;(x<SOCKET_HASH_SZ) && (connectionhash[x].used !=0) ;x++);
+        if (x>=SOCKET_HASH_SZ) {
+            gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"
+                  "TCP connection limit accepting\n");
+            goto read_again;
+        }
+        sl=sizeof(sin);
+        if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl))
+                   < 0) {
+            gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n");
+            goto read_again;
+        }
+        
+        sz=SOCK_BUF_SZ;
+        if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF,
+                       (gs_sp_t )&sz, sizeof(sz)) != 0) {
+            fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n");
+            return -1;
+        }
+        sz=SOCK_BUF_SZ;
+        if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF,
+                       (gs_sp_t )&sz, sizeof(sz)) != 0) {
+            fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n");
+            return -1;
+        }
+        
+        sl=sizeof(sin);
+        if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) {
+            gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n");
+            close(connectionhash[x].socket);
+            goto read_again;
+        }
+        connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr);
+        connectionhash[x].remoteid.port=ntohs(sin.sin_port);
+        connectionhash[x].remoteid.index=0;
+        connectionhash[x].remoteid.streamid=0;
+        connectionhash[x].used=1;
+#ifdef PRINTMSG
+        gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port);
+#endif
+        goto read_again;
+    }
+    for(x=0;x<SOCKET_HASH_SZ;x++) {
+               last=(last+1)%SOCKET_HASH_SZ;
+        if ((connectionhash[last].used==1) && (FD_ISSET(connectionhash[last].socket,&fs))){
+            gs_int32_t rsz;
+#ifdef PRINTMSG
+            gslog(LOG_EMERG,"reading sret:%d block:%u...",sret,block);
+#endif
+            if ((rsz=readall(connectionhash[last].socket,&length,sizeof(gs_uint32_t)))!=sizeof(gs_uint32_t)) {
+                closeconnection(last);
+                gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing %u res %d\n",
+                      connectionhash[last].remoteid.port,rsz);
+                continue;
+            }
+            length=ntohl(length);
+            if (buflen<length) {
+                gs_int8_t d;
+                gslog(LOG_EMERG,"GSCMSGQ::error::message to long (%u) for receive buffer (%u)\n",length,
+                      buflen);
+                /* remove the data */
+                for(y=0;y<length;y++)
+                    if (readall(connectionhash[last].socket,&d,1)!=1) {
+                        closeconnection(last);
+                        gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing for receive buffer mismatch\n");
+                        return -2;
+                    }
+                return -2;
+            }
+            if (readall(connectionhash[last].socket,buf,length)!=length) {
+                closeconnection(last);
+                gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing on data read\n");
+                continue;
+            }
+#ifdef PRINTMSG
+            gslog(LOG_EMERG,"reading done\n");
+#endif
+            return length;
+        }
+    }
+    return -1;
+}
+
+static gs_retval_t send_nack(FTAID recid) {
+    struct internal_message i;
+    i.im.receiver = recid;
+    i.im.sender = myid;
+    i.im.operation = RESERVED_FOR_LOW_LEVEL;
+    i.im.size = sizeof(struct internal_message);
+    i.lowlevelop = LOWLEVELOP_NACK;
+    if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {
+        return 0;
+    } else {
+        return -1;
+    }
+    return -1;
+}
+
+static gs_retval_t send_ack(FTAID recid) {
+    struct internal_message i;
+    i.im.receiver = recid;
+    i.im.sender = myid;
+    i.im.operation = RESERVED_FOR_LOW_LEVEL;
+    i.im.size = sizeof(struct internal_message);
+    i.lowlevelop = LOWLEVELOP_ACK;
+    if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {
+        return 0;
+    } else {
+        return -1;
+    }
+    return -1;
+}
+
+static gs_retval_t wait_for_lowlevel_ack() {
+    gs_int8_t b[MAXMSGSZ];
+    struct internal_message * i;
+    gs_int32_t res;
+    i=(struct internal_message *)b;
+    
+    /* this is bussy waiting if there is another
+     message waiting to be processed so make sure it is only
+     used where bussy waiting is OK. If that becomes
+     a probelm add a local queue
+     */
+    
+    while( 1==1) {
+        if ((res=msg_recv(b, MAXMSGSZ,1,0))>0) {
+            if ((i->im.operation == RESERVED_FOR_LOW_LEVEL)
+                && (( i->lowlevelop ==  LOWLEVELOP_ACK)
+                    || ( i->lowlevelop ==  LOWLEVELOP_NACK))) {
+                    if ( i->lowlevelop ==  LOWLEVELOP_ACK) {
+                        return 0;
+                    } else {
+                        return 1;
+                    }
+                }
+                       if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) {
+                               struct internal_remote_tuple * it;
+                               struct shmlistentry * s;
+                               it=(struct internal_remote_tuple *)b;
+                
+                               if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {
+#ifdef PRINTMSG
+                                       gslog(LOG_EMERG,"Received remote ringbuf message "
+                          "for message of size %u\n",
+                          it->size,it->im.size);
+#endif
+#ifdef BLOCKRINGBUFFER
+                                       while (SPACETOWRITE(s->buf)==0) {
+                                               usleep(1000);
+                                               gslog(LOG_ERR,"Dead in the water we can't "
+                              "drain the ringbuffer we wait for.");
+                                       }
+                                       memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
+                                       ADVANCEWRITE(s->buf);
+#else
+                                       if (SPACETOWRITE(s->buf)) {
+                                               memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
+                                               ADVANCEWRITE(s->buf);
+                                       } else {
+                        //                                             gslog(LOG_EMERG,"r+");
+                                       }
+#endif
+                               } else {
+                                       gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");
+                               }
+                       } else {
+                               gscpipc_sidequeue_append(b, res);
+                       }
+        }  else {
+            if (res < -1) {
+                /* got an error here */
+                gslog(LOG_EMERG,"hostlib::error::received error "
+                      "during wait for low level ack\n");
+                return -1;
+            }
+        }
+    }
+    /* never reached */
+    return -1;
+}
+
+void shmlist_drain_remote()
+{
+    gs_int32_t x;
+    gs_int8_t buf[MAXSZ];
+    struct internal_remote_tuple * it;
+    it = (struct internal_remote_tuple *) buf;
+    for (x=0; x<shmlistlen; x++) {
+        if ((shmlist[x].msgid.ip != myid.ip)&& (shmlist[x].type==SHM_SEND)) {
+            while (UNREAD(shmlist[x].buf)) {
+                it->im.receiver = shmlist[x].msgid;
+                it->im.sender = myid;
+                it->im.operation = RESERVED_FOR_LOW_LEVEL;
+                it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE;
+                it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1;
+                it->im.size = sizeof(struct internal_remote_tuple)-4+it->size;
+                memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size);
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n",
+                      it->size, it->im.size);
+#endif
+                if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) {
+                    break;
+                }
+                ADVANCEREAD(shmlist[x].buf);
+            }
+        }
+    }
+}
+
+/*
+ *used to contact the clearinghouse process returns the MSGID of
+ * the current process
+ */
+gs_retval_t gscpipc_init(gs_int32_t clearinghouse)
+{
+    struct internal_message i;
+    key_t msqtoken=0;
+    gs_int32_t x;
+    endpoint gshub;
+    endpoint tmpclearinghouse;
+    
+    /* make sure priveleges can be set */
+    umask(0);
+    
+    clearinghouseftaid.index=0;
+    clearinghouseftaid.streamid=0;
+    
+    if (get_hub(&gshub)!=0) {
+        gslog(LOG_EMERG,"hostlib::error::could get hub\n");
+        return -1;
+    }
+    
+    if (clearinghouse!=0) {
+        // This is the clearinghouse
+        gs_int8_t buf[MAXMSGSZ];
+        
+        if (msg_init(1)<0) {
+            gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");
+            return -1;
+        }
+        
+        clearinghouseftaid.ip=myid.ip;
+        clearinghouseftaid.port=myid.port;
+        
+        tmpclearinghouse.ip=htonl(clearinghouseftaid.ip);
+        tmpclearinghouse.port=htons(clearinghouseftaid.port);
+        
+        if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) {
+            gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance");
+            return -1;
+        }
+        
+        return 0;
+        
+    } else {
+        // This is an lfta/hfta/app
+        gs_int32_t res;
+        
+        if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) {
+            gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n");
+            return -1;
+        }
+        
+        clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip);
+        clearinghouseftaid.port=ntohs(tmpclearinghouse.port);
+        
+        
+        if (msg_init(0)<0) {
+            gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");
+            return -1;
+        }
+        
+        i.im.receiver = clearinghouseftaid;
+        i.im.sender = myid;
+        i.im.operation = RESERVED_FOR_LOW_LEVEL;
+        i.im.size = sizeof(struct internal_message);
+        i.lowlevelop = LOWLEVELOP_REGISTER;
+#ifdef PRINTMSG
+        i.im.pmsgid=pmsgid;
+        pmsgid++;
+        gslog(LOG_EMERG,"send a message (%d.%u) to %u with op "
+              "%u with size %u\n",
+              i.im.pmsgid,i.im.receiver.port,i.im.operation,
+              i.im.size);
+#endif
+        if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) {
+            /* we can wait her for an ack since nobody should know
+             about us yet */
+        init_read_again:
+            if ((res=msg_recv((gs_sp_t )&i,
+                              sizeof(struct internal_message),
+                              1,0))>=0) {
+                if (i.lowlevelop == LOWLEVELOP_ACK) {
+                    return 0;
+                } else {
+                    gslog(LOG_EMERG,"hostlib::error::received unexpected message "
+                          "during initalization\n");
+                    return -1;
+                }
+            } else {
+                if (res<-1) {
+                    /* got an error here */
+                    gslog(LOG_EMERG,"hostlib::error::received error message "
+                          "during initalization\n");
+                    return -1;
+                } else {
+                    goto init_read_again;
+                }
+            }
+        }
+        gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n");
+        return -1;
+    }
+    return 0;
+}
+
+
+static gs_retval_t gscpdetachshm(FTAID target)
+{
+    struct internal_message i;
+    
+    i.im.receiver = target;
+    i.im.sender = myid;
+    i.im.operation = RESERVED_FOR_LOW_LEVEL;
+    i.im.size = sizeof(struct internal_message);
+    i.lowlevelop = LOWLEVELOP_SHM_FREE;
+    if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) {
+        /* no reason to wait here won't be acked anyway */
+        return -1;
+    }
+    return 0;
+}
+
+static gs_retval_t gscpdetachsocket(FTAID target)
+{
+    struct internal_message i;
+    
+    i.im.receiver = target;
+    i.im.sender = myid;
+    i.im.operation = RESERVED_FOR_LOW_LEVEL;
+    i.im.size = sizeof(struct internal_message);
+    i.lowlevelop = LOWLEVELOP_UNREGISTER;
+    if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) {
+        /* no reason to wait here won't be acked anyway */
+        return -1;
+    }
+    return -1;
+}
+
+
+/* used to disassociate process from clearinghouse */
+gs_retval_t gscpipc_free()
+{
+    gs_int32_t x;
+    /* XXX OS if this function is called when there are still
+     subscribed FTAs for this process the clearinghouse will
+     crash
+     */
+    
+    if (clearinghouse!=0) {
+        return 0;
+    } else {
+        gs_int32_t x;
+        for (x=0; x<shmlistlen; x++) {
+            if (shmlist[x].type==SHM_RECV) {
+                gscpdetachshm(shmlist[x].msgid);
+                if (shmdt((gs_sp_t )shmlist[x].buf)!=0) {
+                    gslog(LOG_EMERG,"hostlib::error::could not "
+                          "detach shared memory\n");
+                }
+            } else {
+                gslog(LOG_EMERG,"hostlib::error::porccess freed while still "
+                      "attached to sending shared memory\n");
+            }
+        }
+    }
+    /* remove connection */
+    for(x=0;x<SOCKET_HASH_SZ;x++) {
+        if (connectionhash[x].used==1) {
+            //XXX detach does not work due to interleaved messages
+            // gscpdetachsocket(connectionhash[x].remoteid);
+            close(connectionhash[x].socket);
+        }
+    }
+    close(listen_socket);
+    return 0;
+}
+
+/* returns MSGID of current process */
+FTAID gscpipc_getftaid()
+{
+    return myid;
+}
+
+
+/* sends a message to a process */
+gs_retval_t gscpipc_send(FTAID f, gs_int32_t operation, gs_sp_t  buf, gs_int32_t length, gs_int32_t block)
+{
+    gs_int8_t b[MAXMSGSZ];
+    struct ipc_message * i;
+    struct shmlistentry * s;
+    if (length > MAXMSGSZ) {
+        gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n");
+        return -1;
+    }
+    i = (struct ipc_message *) b;
+    i->receiver=f;
+    i->sender=myid;
+    i->operation=operation;
+    i->size=length+sizeof(struct ipc_message);
+    memcpy(&i->data[0],buf,length);
+#ifdef PRINTMSG
+    i->pmsgid=pmsgid;
+    pmsgid++;
+    gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n",
+          i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length);
+#endif
+    if ((s=shmlist_find(f, SHM_RECV))!=0) {
+        // set the hint in the ringbuffer that there is something on the shared memory queue
+        s->buf->mqhint=1;
+    }
+    if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) {
+        gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno);
+        return -1;
+    }
+    return 0;
+}
+
+/* retrieve a message buf has to be at least of size MAXMSGSZ*/
+gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t  buf, gs_int32_t * size, gs_int32_t block)
+{
+    gs_int32_t w;
+    gs_int32_t x;
+    gs_int8_t b[MAXSZ];
+       gs_int32_t y;
+    
+    struct internal_message * i;
+    struct internal_remote_tuple * it;
+    gs_int32_t length;
+    
+    struct shmlistentry * s;
+    
+    i=(struct internal_message *)b;
+    it=(struct internal_remote_tuple *)b;
+    
+    for(y=0;(y < 10) || (block==1);y++) {
+        shmlist_drain_remote();
+               length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1);
+               if (length < -1) {
+                       /* problem */
+                       return -1;
+               }
+               if (length < 0) {
+                       /* we are nonblocking and have nothing to do */
+            if (block==1) {
+                // we are expected to block for ever if it is 0 or 2 we return
+                continue;
+            }
+                       return 0;
+               }
+#ifdef PRINTMSG
+               gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n",
+              i->im.pmsgid,  i->im.sender, i->im.sender,i->im.operation,i->im.size);
+#endif
+               if (length <0) {
+                       gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno);
+                       return -1;
+               }
+               if (i->im.operation != RESERVED_FOR_LOW_LEVEL) {
+                       memcpy(buf,&(i->im.data[0]),i->im.size);
+                       *size=i->im.size-sizeof(struct ipc_message);
+                       *operation=i->im.operation;
+                       *f=i->im.sender;
+                       if ((s=shmlist_find(*f, SHM_SEND))!=0) {
+                               // clear the hint in the ringbuffer to indicate we got the message
+                s->buf->mqhint=0;
+                       }
+                       return 1;
+               }
+               switch (i->lowlevelop) {
+            case LOWLEVELOP_REGISTER:
+                /* this should only get called if the process is the clearinghouse */
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port);
+#endif
+                send_ack(i->im.sender);
+                break;
+            case LOWLEVELOP_UNREGISTER:
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port);
+#endif    /* remove connection */
+                for(x=0;x<SOCKET_HASH_SZ;x++) {
+                    if ( (connectionhash[x].used==1)
+                        && (connectionhash[x].remoteid.ip==i->im.sender.ip)
+                        && (connectionhash[x].remoteid.port==i->im.sender.port)) {
+                       gslog(LOG_EMERG,"Close by remote request %u\n",
+                              connectionhash[x].remoteid.port);
+                        // XXX closed when the other process dies
+                        // can't close it yet since we might have
+                        // some more messages
+                        // close(connectionhash[x].socket);
+                        connectionhash[x].used=0;
+                    }
+                }
+                break;
+            case LOWLEVELOP_SHM_REGISTER:
+            {
+                gs_int32_t shmid;
+                struct ringbuf * r;
+                struct shmid_ds sms;
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n",
+                      i->im.sender.port,
+                      i->shmtoken,i->shmsz);
+#endif
+                if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) {
+                    if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) {
+                        gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno);
+                        send_nack(i->im.sender);
+                    } else {
+                        // Make sure all the momory gets mapped now
+                                       for(x=0;x<length;x=x+1024) {
+                            ((gs_uint8_t *) r)[x]=0;
+                                       }
+                        
+#ifdef PRINTMSG
+                        gslog(LOG_EMERG,"Got a ring buffer at address %p (%u %u %u %u)\n"
+                              ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);
+#endif
+                        if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken,
+                                        shmid,r,i->shmsz)<0) {
+                            shmdt((gs_sp_t )r);
+                            shmctl(shmid,IPC_RMID,&sms);
+                            gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n");
+                            send_nack(i->im.sender);
+                        } else {
+                            send_ack(i->im.sender);
+                        }
+                    }
+                } else {
+                    gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno);
+                    send_nack(i->im.sender);
+                }
+            }
+                break;
+            case LOWLEVELOP_SHM_REMOTE_REGISTER:
+            {
+                gs_int32_t shmid;
+                struct ringbuf * r;
+                struct shmid_ds sms;
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"request to get remote shm %u  size %u\n",
+                      i->im.sender.port,i->shmsz);
+#endif
+                if ((r=(struct ringbuf *)malloc(i->shmsz))==0) {
+                    gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno);
+                    send_nack(i->im.sender);
+                } else {
+                    // make sure all the memory gets mapped now
+                    for(x=0;x<length;x=x+1024) {
+                        ((gs_uint8_t *) r)[x]=0;
+                    }
+                    r->reader=0;
+                    r->writer=0;
+                    r->length=i->shmsz;
+                    r->end= i->shmsz-MAXTUPLESZ;
+                    r->destid=i->im.receiver;
+                    r->srcid=i->im.sender;
+#ifdef PRINTMSG
+                    gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n"
+                          ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);
+#endif
+                    if (shmlist_add(i->im.sender,SHM_SEND,0,
+                                    0,r,i->shmsz)<0) {
+                        gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n");
+                        send_nack(i->im.sender);
+                    } else {
+                        send_ack(i->im.sender);
+                    }
+                }
+            }
+                break;
+            case LOWLEVELOP_SHM_FREE:
+            {
+                struct shmlistentry * sm;
+                struct shmid_ds sms;
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);
+#endif
+                if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {
+#ifdef PRINTMSG
+                    gslog(LOG_EMERG,"freeing %u",sm->shmid);
+#endif
+                    shmdt((gs_sp_t )sm->buf);
+                    shmctl(sm->shmid,IPC_RMID,&sms);
+                    shmlist_rm(i->im.sender,SHM_SEND);
+                }
+            }
+                break;
+            case LOWLEVELOP_SHM_REMOTE_FREE:
+            {
+                struct shmlistentry * sm;
+                struct shmid_ds sms;
+#ifdef PRINTMSG
+                gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);
+#endif
+                if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {
+#ifdef PRINTMSG
+                    gslog(LOG_EMERG,"freeing %u",sm->shmid);
+#endif
+                    free((gs_sp_t ) sm->buf);
+                    shmlist_rm(i->im.sender,SHM_SEND);
+                }
+            }
+                break;
+            case LOWLEVELOP_REMOTE_TUPLE:
+            {
+                if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {
+#ifdef PRINTMSG
+                    gslog(LOG_EMERG,"Received remote ringbuf message "
+                          "for message of size %u\n",
+                          it->size,it->im.size);
+#endif
+#ifdef BLOCKRINGBUFFER
+                    while (SPACETOWRITE(s->buf)==0) {
+                        usleep(1000);
+                        gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for.");
+                    }
+                    memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
+                    ADVANCEWRITE(s->buf);
+#else
+                    if (SPACETOWRITE(s->buf)) {
+                        memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
+                        ADVANCEWRITE(s->buf);
+                    } else {
+                        //                   gslog(LOG_EMERG,"r+");
+                    }
+#endif
+                } else {
+                    gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");
+                }
+            }
+                break;
+            default:
+                gslog(LOG_EMERG,"hostlib::error::unexpected message received\n");
+                return -1;
+               }
+    }
+    return 0;
+}
+
+/* allocate a ringbuffer which allows receiving data from
+ * the other process. returns 0 if didn't succeed and
+ * returns an existing buffer if it exists */
+struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length)
+{
+    struct shmid_ds sms;
+    struct shmlistentry * se;
+    gs_int8_t keybuf[1024];
+    key_t shmtoken=0;
+    gs_int32_t shmid=0;
+    struct ringbuf * r;
+    struct internal_message i;
+    gs_int32_t x;
+    
+    se = shmlist_find(f, SHM_RECV);
+    if (se) {
+        return se->buf;
+    }
+    
+    if (length<(4*MAXTUPLESZ)) {
+        gslog(LOG_EMERG,
+              "ERROR:buffersize in gscpipc_createshm  has to be "
+              "at least %u Bytes long\n",
+              4*MAXTUPLESZ);
+        return 0;
+    }
+    
+    if (myid.ip == f.ip) {
+        sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port);
+        if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1)  {
+            gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n");
+            return 0;
+        }
+        close(x);
+        
+        if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) {
+            gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n");
+            return 0;
+        }
+        
+        if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL|
+                                        IPC_CREAT|IPC_EXCL))==-1) {
+            gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno);
+            return 0;
+        }
+        if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) {
+            gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n");
+            return 0;
+        }
+        /* touch all memory once to map/reserve it now */
+        for(x=0;x<length;x=x+1024) {
+            ((gs_uint8_t *) r)[x]=0;
+        }
+        r->reader=0;
+        r->writer=0;
+        r->length=length;
+        r->end= r->length-MAXTUPLESZ;
+        r->destid=f;
+        r->srcid=myid;
+        i.im.receiver = f;
+        i.im.sender = myid;
+        i.im.operation = RESERVED_FOR_LOW_LEVEL;
+        i.im.size = sizeof(struct internal_message);
+        i.lowlevelop = LOWLEVELOP_SHM_REGISTER;
+        i.shmtoken = shmtoken;
+        i.shmsz=length;
+        if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {
+            if (wait_for_lowlevel_ack()<0) {
+                shmdt((gs_sp_t )r);
+                shmctl(shmid,IPC_RMID,&sms);
+                return 0;
+            }
+        }
+        shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */  
+        if (shmlist_add(f, SHM_RECV, shmtoken, 
+                        shmid, r, length) <0) {
+            shmdt((gs_sp_t )r);
+            shmctl(shmid,IPC_RMID,&sms);
+            i.im.receiver = f;
+            i.im.sender = myid;
+            i.im.operation = RESERVED_FOR_LOW_LEVEL;
+            i.im.size = sizeof(struct internal_message);
+            i.lowlevelop = LOWLEVELOP_SHM_FREE;
+            i.shmtoken = shmtoken;
+            msg_send(f,(gs_sp_t )&i,i.im.size,1);
+            return 0;
+        }
+    } else {
+        /* remote shared memory */
+        if ((r=(struct ringbuf *)malloc(length))==0) {
+            gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n");
+            return 0;
+        }
+        /* touch all memory once to map/reserve it now */
+        for(x=0;x<length;x=x+1024) {
+            ((gs_uint8_t *) r)[x]=0;
+        }
+        r->reader=0;
+        r->writer=0;
+        r->length=length;
+        r->end= r->length-MAXTUPLESZ;
+        r->destid=f;
+        r->srcid=myid;
+        i.im.receiver = f;
+        i.im.sender = myid;
+        i.im.operation = RESERVED_FOR_LOW_LEVEL;
+        i.im.size = sizeof(struct internal_message);
+        i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER;
+        i.shmtoken = shmtoken;
+        i.shmsz=length;
+        if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {
+            if (wait_for_lowlevel_ack()<0) {
+                free(r);
+                return 0;
+            }
+        }
+        if (shmlist_add(f, SHM_RECV, 0, 
+                        0, r, length) <0) {
+            free(r);
+            i.im.receiver = f;
+            i.im.sender = myid;
+            i.im.operation = RESERVED_FOR_LOW_LEVEL;
+            i.im.size = sizeof(struct internal_message);
+            i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;
+            msg_send(f,(gs_sp_t )&i,i.im.size,1);
+            return 0;
+        }
+    }
+    return r;
+       
+}
+
+/* finds a ringbuffer to send which was allocated by
+ * gscpipc_creatshm and return 0 on an error */
+
+struct ringbuf * gscpipc_getshm(FTAID f) 
+{
+    struct shmlistentry * se;
+    gs_int32_t recmsgid;
+    se = shmlist_find(f, SHM_SEND);
+    if (se) {
+        return se->buf;
+    }
+    return 0;
+}     
+
+/* frees shared memory to a particular proccess identified
+ * by MSGID
+ */ 
+gs_retval_t gscpipc_freeshm(FTAID f)
+{
+    struct internal_message i;
+    struct shmlistentry * sm;
+    struct shmid_ds sms;
+    if (myid.ip == f.ip) { 
+        if ((sm=shmlist_find(f, SHM_RECV)) <0) {
+            shmdt((gs_sp_t )sm->buf);
+            shmctl(sm->shmid,IPC_RMID,&sms);
+            i.im.receiver = f;
+            i.im.sender = myid;
+            i.im.operation = RESERVED_FOR_LOW_LEVEL;
+            i.im.size = sizeof(struct internal_message);
+            i.lowlevelop = LOWLEVELOP_SHM_FREE;
+            i.shmtoken = sm->shmtoken;
+            msg_send(f,(gs_sp_t )&i,i.im.size,1);
+            shmlist_rm(f, SHM_RECV);
+        }
+    } else {
+        if ((sm=shmlist_find(f, SHM_RECV)) <0) {
+            free((gs_sp_t )sm->buf);
+            i.im.receiver = f;
+            i.im.sender = myid;
+            i.im.operation = RESERVED_FOR_LOW_LEVEL;
+            i.im.size = sizeof(struct internal_message);
+            i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;
+            msg_send(f,(gs_sp_t )&i,i.im.size,1);
+            shmlist_rm(f, SHM_RECV);
+        }
+    }    
+    return 0;
+}
+
+gs_retval_t gscpipc_mqhint()
+{
+    gs_int32_t x;
+    for (x=0; x<shmlistlen; x++) {
+        if (shmlist[x].type == SHM_SEND) {
+            if (shmlist[x].buf->mqhint) {
+                return 1;
+            }
+        }
+    }
+    return 0;
+}