X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Flib%2Fgscphost%2Fgscpipc.c;fp=src%2Flib%2Fgscphost%2Fgscpipc.c;h=502c54ca05435de3affe202cbd4eb77d6b1061e0;hb=e981e864b812c938d3df8b555b6bb98bb89273e7;hp=bee7b39d7c1a76eb92ac54fb2661b4636e85b02d;hpb=44ea17511358ebc75952066580e31cba8b38ddb8;p=com%2Fgs-lite.git diff --git a/src/lib/gscphost/gscpipc.c b/src/lib/gscphost/gscpipc.c index bee7b39..502c54c 100644 --- a/src/lib/gscphost/gscpipc.c +++ b/src/lib/gscphost/gscpipc.c @@ -1,1344 +1,1344 @@ -/* ------------------------------------------------ - Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#include "gsconfig.h" -#include "gstypes.h" -#include "gscpipc.h" -#include "gshub.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#ifdef __linux__ -#include -#include -#include -#endif - -#ifndef socklen_t -#define socklen_t gs_uint32_t -#endif - - -struct FTAID clearinghouseftaid; - - - -struct connection { - gs_int32_t socket; /* socket for connection */ - gs_int32_t used; /* 1 if the entry is in use */ - FTAID remoteid; /* remoteid of connection */ -}; - -struct connection connectionhash[SOCKET_HASH_SZ]; - -gs_int32_t clearinghouse=0; - -#define SHMTYPE 's' -#define SHM_RECV 1 -#define SHM_SEND 2 - -struct shmlistentry { - FTAID msgid; - gs_int32_t type; - key_t shmtoken; - gs_int32_t shmid; - struct ringbuf * buf; - gs_int32_t buffsize; -}; - -gs_int32_t shmlistlen=0; -struct shmlistentry shmlist[MAX_NUMBER_OF_SHM]; - -struct sq { - gs_int8_t buf[MAXMSGSZ]; - gs_int32_t length; - struct sq * next; -}; - -static struct sq * sqtop=0; -static struct sq * sqtail=0; - -/* adds a buffer to the end of the sidequeue*/ -gs_retval_t gscpipc_sidequeue_append(gs_sp_t buf, gs_int32_t length) -{ - struct sq * s; - if ((s=malloc(sizeof(struct sq)))==0) { - gslog(LOG_EMERG,"Could not allocate memory for sidequeue"); - return -1; - } - memcpy(&s->buf[0],buf,MAXMSGSZ); - s->length=length; - s->next=0; - if (sqtail) { - sqtail->next=s; - sqtail=s; - } else { - sqtop = s; - sqtail = s; - } - return 0; -} - -/* removes a buffer from the top of the sidequeue*/ -gs_retval_t gscpipc_sidequeue_pop(gs_sp_t buf, gs_int32_t * length, gs_int32_t buflen) -{ - struct sq * s; - if (sqtop) { - if (sqtop->length > buflen) { - return -2; - } - memcpy(buf,&sqtop->buf[0],sqtop->length); - *length=sqtop->length; - s=sqtop; - sqtop=sqtop->next; - if (sqtop==0) sqtail=0; - free(s); - return 0; - } - return -1; -} - -struct ipc_message { - FTAID receiver; - FTAID sender; - gs_int32_t operation; -#ifdef PRINTMSG - gs_int32_t pmsgid; -#endif - gs_int32_t size; - gs_int8_t data[4]; -}; - -#define LOWLEVELOP_ACK 0 -#define LOWLEVELOP_NACK 1 -#define LOWLEVELOP_REGISTER 2 -#define LOWLEVELOP_UNREGISTER 3 -#define LOWLEVELOP_SHM_REGISTER 4 -#define LOWLEVELOP_SHM_FREE 5 -#define LOWLEVELOP_SHM_REMOTE_REGISTER 6 -#define LOWLEVELOP_SHM_REMOTE_FREE 7 -#define LOWLEVELOP_REMOTE_TUPLE 8 - -struct internal_message{ - struct ipc_message im; - int lowlevelop; - key_t shmtoken; - int shmsz; - int result; -}; - -struct internal_remote_tuple{ - struct ipc_message im; - int lowlevelop; - int size; - gs_int8_t data[4]; -}; - -FTAID myid; -int listen_socket; - -#ifdef PRINTMSG -int pmsgid=0; -#endif - - -struct shmlistentry * shmlist_find(FTAID msgid, int type) -{ - int x; - for (x=0; x=MAX_NUMBER_OF_SHM) { - gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many" - "shm registered\n"); - return -1; - } - shmlist[shmlistlen].msgid=msgid; - shmlist[shmlistlen].type=type; - shmlist[shmlistlen].shmtoken=shmtoken; - shmlist[shmlistlen].shmid=shmid; - shmlist[shmlistlen].buf=buf; - shmlist[shmlistlen].buffsize=buffsize; - shmlistlen++; - return 0; -} - -static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type) -{ - gs_int32_t x; - gs_int32_t move=0; - for (x=0;x0) { - if ((rv=write(socket,buf,sz))<0) { - if (errno == EINTR) - continue; - else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS - return 0; - else - return -1; - } - sz-=rv; - buf+=rv; - } - return res; -} - -static gs_retval_t msg_send(FTAID id, gs_sp_t buf, gs_uint32_t len, gs_uint32_t block) { - struct sockaddr_in sin; - gs_int32_t x; - gs_int32_t u; - gs_int32_t sz; - gs_int32_t ret; - -try_send_again: - u=-1; - for(x=0;x=SOCKET_HASH_SZ) || (u<0)) { - gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum" - " TCP connection limit sending %d\n",u); - return -1; - } - connectionhash[u].remoteid=id; - connectionhash[u].remoteid.index=0; - connectionhash[u].remoteid.streamid=0; - sin.sin_family = AF_INET; -#ifdef __NetBSD__ - sin.sin_len = sizeof(sin); -#endif - sin.sin_addr.s_addr = htonl(id.ip); - sin.sin_port = htons(id.port); - if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n"); - return -1; - } - if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n"); - return -1; - } - sz=SOCK_BUF_SZ; - if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF, - (gs_sp_t )&sz, sizeof(sz)) != 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n"); - return -1; - } - sz=SOCK_BUF_SZ; - if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF, - (gs_sp_t )&sz, sizeof(sz)) != 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n"); - return -1; - } -#ifndef __linux__ - // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead - sz=MAXSZ+4; - if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT, - (gs_sp_t )&sz, sizeof(sz)) != 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno); - return -1; - } -#endif - sz=1; - if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE, - (gs_sp_t )&sz, sizeof(sz)) != 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n"); - return -1; - } - connectionhash[u].used=1; - goto try_send_again; - return -1; -} - -static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) { - gs_int32_t rv; - gs_sp_t buf = (gs_sp_t )b; - gs_int32_t res=sz; - while(sz>0) { - if ((rv=read(socket,buf,sz))<0) { - if (errno == EINTR) - continue; - gslog(LOG_EMERG,"read with error number %u \n",errno); - return -1; - } - if (rv==0) { - return 0; - } - sz-=rv; - buf+=rv; - } - return res; -} - -/* msg_recv return len if data is available -1 for a timeout and -2 for an error */ -static gs_retval_t msg_recv(gs_sp_t buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) { - struct sockaddr_in sin; - socklen_t sl; - fd_set fs; - gs_int32_t x,y; - gs_int32_t n; - gs_int32_t length; - struct timeval tv; - gs_int32_t sret; - static gs_int32_t last=0; - gs_uint32_t sz; - - if (check_sideque==1) { - if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) { - return length; - } - if (x==-2) { - gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n"); - return -2; - } - } - -read_again: - if (block==0) { - tv.tv_sec = 0; - tv.tv_usec = 0; - } else { - tv.tv_sec = 0; - tv.tv_usec = 100000; - } - FD_ZERO(&fs); - FD_SET(listen_socket,&fs); - n=listen_socket; - for(x=0;x=SOCKET_HASH_SZ) { - gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum" - "TCP connection limit accepting\n"); - goto read_again; - } - sl=sizeof(sin); - if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl)) - < 0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n"); - goto read_again; - } - - sz=SOCK_BUF_SZ; - if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF, - (gs_sp_t )&sz, sizeof(sz)) != 0) { - fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n"); - return -1; - } - sz=SOCK_BUF_SZ; - if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF, - (gs_sp_t )&sz, sizeof(sz)) != 0) { - fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n"); - return -1; - } - - sl=sizeof(sin); - if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) { - gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n"); - close(connectionhash[x].socket); - goto read_again; - } - connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr); - connectionhash[x].remoteid.port=ntohs(sin.sin_port); - connectionhash[x].remoteid.index=0; - connectionhash[x].remoteid.streamid=0; - connectionhash[x].used=1; -#ifdef PRINTMSG - gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port); -#endif - goto read_again; - } - for(x=0;x0) { - if ((i->im.operation == RESERVED_FOR_LOW_LEVEL) - && (( i->lowlevelop == LOWLEVELOP_ACK) - || ( i->lowlevelop == LOWLEVELOP_NACK))) { - if ( i->lowlevelop == LOWLEVELOP_ACK) { - return 0; - } else { - return 1; - } - } - if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) { - struct internal_remote_tuple * it; - struct shmlistentry * s; - it=(struct internal_remote_tuple *)b; - - if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) { -#ifdef PRINTMSG - gslog(LOG_EMERG,"Received remote ringbuf message " - "for message of size %u\n", - it->size,it->im.size); -#endif -#ifdef BLOCKRINGBUFFER - while (SPACETOWRITE(s->buf)==0) { - usleep(1000); - gslog(LOG_ERR,"Dead in the water we can't " - "drain the ringbuffer we wait for."); - } - memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); - ADVANCEWRITE(s->buf); -#else - if (SPACETOWRITE(s->buf)) { - memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); - ADVANCEWRITE(s->buf); - } else { - // gslog(LOG_EMERG,"r+"); - } -#endif - } else { - gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n"); - } - } else { - gscpipc_sidequeue_append(b, res); - } - } else { - if (res < -1) { - /* got an error here */ - gslog(LOG_EMERG,"hostlib::error::received error " - "during wait for low level ack\n"); - return -1; - } - } - } - /* never reached */ - return -1; -} - -void shmlist_drain_remote() -{ - gs_int32_t x; - gs_int8_t buf[MAXSZ]; - struct internal_remote_tuple * it; - it = (struct internal_remote_tuple *) buf; - for (x=0; xim.receiver = shmlist[x].msgid; - it->im.sender = myid; - it->im.operation = RESERVED_FOR_LOW_LEVEL; - it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE; - it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1; - it->im.size = sizeof(struct internal_remote_tuple)-4+it->size; - memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size); -#ifdef PRINTMSG - gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n", - it->size, it->im.size); -#endif - if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) { - break; - } - ADVANCEREAD(shmlist[x].buf); - } - } - } -} - -/* - *used to contact the clearinghouse process returns the MSGID of - * the current process - */ -gs_retval_t gscpipc_init(gs_int32_t clearinghouse) -{ - struct internal_message i; - key_t msqtoken=0; - gs_int32_t x; - endpoint gshub; - endpoint tmpclearinghouse; - - /* make sure priveleges can be set */ - umask(0); - - clearinghouseftaid.index=0; - clearinghouseftaid.streamid=0; - - if (get_hub(&gshub)!=0) { - gslog(LOG_EMERG,"hostlib::error::could get hub\n"); - return -1; - } - - if (clearinghouse!=0) { - // This is the clearinghouse - gs_int8_t buf[MAXMSGSZ]; - - if (msg_init(1)<0) { - gslog(LOG_EMERG,"hostlib::error::could not init msgq\n"); - return -1; - } - - clearinghouseftaid.ip=myid.ip; - clearinghouseftaid.port=myid.port; - - tmpclearinghouse.ip=htonl(clearinghouseftaid.ip); - tmpclearinghouse.port=htons(clearinghouseftaid.port); - - if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) { - gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance"); - return -1; - } - - return 0; - - } else { - // This is an lfta/hfta/app - gs_int32_t res; - - if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) { - gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n"); - return -1; - } - - clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip); - clearinghouseftaid.port=ntohs(tmpclearinghouse.port); - - - if (msg_init(0)<0) { - gslog(LOG_EMERG,"hostlib::error::could not init msgq\n"); - return -1; - } - - i.im.receiver = clearinghouseftaid; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_REGISTER; -#ifdef PRINTMSG - i.im.pmsgid=pmsgid; - pmsgid++; - gslog(LOG_EMERG,"send a message (%d.%u) to %u with op " - "%u with size %u\n", - i.im.pmsgid,i.im.receiver.port,i.im.operation, - i.im.size); -#endif - if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) { - /* we can wait her for an ack since nobody should know - about us yet */ - init_read_again: - if ((res=msg_recv((gs_sp_t )&i, - sizeof(struct internal_message), - 1,0))>=0) { - if (i.lowlevelop == LOWLEVELOP_ACK) { - return 0; - } else { - gslog(LOG_EMERG,"hostlib::error::received unexpected message " - "during initalization\n"); - return -1; - } - } else { - if (res<-1) { - /* got an error here */ - gslog(LOG_EMERG,"hostlib::error::received error message " - "during initalization\n"); - return -1; - } else { - goto init_read_again; - } - } - } - gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n"); - return -1; - } - return 0; -} - - -static gs_retval_t gscpdetachshm(FTAID target) -{ - struct internal_message i; - - i.im.receiver = target; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_FREE; - if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) { - /* no reason to wait here won't be acked anyway */ - return -1; - } - return 0; -} - -static gs_retval_t gscpdetachsocket(FTAID target) -{ - struct internal_message i; - - i.im.receiver = target; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_UNREGISTER; - if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) { - /* no reason to wait here won't be acked anyway */ - return -1; - } - return -1; -} - - -/* used to disassociate process from clearinghouse */ -gs_retval_t gscpipc_free() -{ - gs_int32_t x; - /* XXX OS if this function is called when there are still - subscribed FTAs for this process the clearinghouse will - crash - */ - - if (clearinghouse!=0) { - return 0; - } else { - gs_int32_t x; - for (x=0; x MAXMSGSZ) { - gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n"); - return -1; - } - i = (struct ipc_message *) b; - i->receiver=f; - i->sender=myid; - i->operation=operation; - i->size=length+sizeof(struct ipc_message); - memcpy(&i->data[0],buf,length); -#ifdef PRINTMSG - i->pmsgid=pmsgid; - pmsgid++; - gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n", - i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length); -#endif - if ((s=shmlist_find(f, SHM_RECV))!=0) { - // set the hint in the ringbuffer that there is something on the shared memory queue - s->buf->mqhint=1; - } - if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) { - gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno); - return -1; - } - return 0; -} - -/* retrieve a message buf has to be at least of size MAXMSGSZ*/ -gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t buf, gs_int32_t * size, gs_int32_t block) -{ - gs_int32_t w; - gs_int32_t x; - gs_int8_t b[MAXSZ]; - gs_int32_t y; - - struct internal_message * i; - struct internal_remote_tuple * it; - gs_int32_t length; - - struct shmlistentry * s; - - i=(struct internal_message *)b; - it=(struct internal_remote_tuple *)b; - - for(y=0;(y < 10) || (block==1);y++) { - shmlist_drain_remote(); - length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1); - if (length < -1) { - /* problem */ - return -1; - } - if (length < 0) { - /* we are nonblocking and have nothing to do */ - if (block==1) { - // we are expected to block for ever if it is 0 or 2 we return - continue; - } - return 0; - } -#ifdef PRINTMSG - gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n", - i->im.pmsgid, i->im.sender, i->im.sender,i->im.operation,i->im.size); -#endif - if (length <0) { - gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno); - return -1; - } - if (i->im.operation != RESERVED_FOR_LOW_LEVEL) { - memcpy(buf,&(i->im.data[0]),i->im.size); - *size=i->im.size-sizeof(struct ipc_message); - *operation=i->im.operation; - *f=i->im.sender; - if ((s=shmlist_find(*f, SHM_SEND))!=0) { - // clear the hint in the ringbuffer to indicate we got the message - s->buf->mqhint=0; - } - return 1; - } - switch (i->lowlevelop) { - case LOWLEVELOP_REGISTER: - /* this should only get called if the process is the clearinghouse */ -#ifdef PRINTMSG - gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port); -#endif - send_ack(i->im.sender); - break; - case LOWLEVELOP_UNREGISTER: -#ifdef PRINTMSG - gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port); -#endif /* remove connection */ - for(x=0;xim.sender.ip) - && (connectionhash[x].remoteid.port==i->im.sender.port)) { - gslog(LOG_EMERG,"Close by remote request %u\n", - connectionhash[x].remoteid.port); - // XXX closed when the other process dies - // can't close it yet since we might have - // some more messages - // close(connectionhash[x].socket); - connectionhash[x].used=0; - } - } - break; - case LOWLEVELOP_SHM_REGISTER: - { - gs_int32_t shmid; - struct ringbuf * r; - struct shmid_ds sms; -#ifdef PRINTMSG - gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n", - i->im.sender.port, - i->shmtoken,i->shmsz); -#endif - if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) { - if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) { - gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno); - send_nack(i->im.sender); - } else { - // Make sure all the momory gets mapped now - for(x=0;xreader,r->writer,r->length,i->shmtoken); -#endif - if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken, - shmid,r,i->shmsz)<0) { - shmdt((gs_sp_t )r); - shmctl(shmid,IPC_RMID,&sms); - gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n"); - send_nack(i->im.sender); - } else { - send_ack(i->im.sender); - } - } - } else { - gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno); - send_nack(i->im.sender); - } - } - break; - case LOWLEVELOP_SHM_REMOTE_REGISTER: - { - gs_int32_t shmid; - struct ringbuf * r; - struct shmid_ds sms; -#ifdef PRINTMSG - gslog(LOG_EMERG,"request to get remote shm %u size %u\n", - i->im.sender.port,i->shmsz); -#endif - if ((r=(struct ringbuf *)malloc(i->shmsz))==0) { - gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno); - send_nack(i->im.sender); - } else { - // make sure all the memory gets mapped now - for(x=0;xreader=0; - r->writer=0; - r->length=i->shmsz; - r->end= i->shmsz-MAXTUPLESZ; - r->destid=i->im.receiver; - r->srcid=i->im.sender; -#ifdef PRINTMSG - gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n" - ,(void *)r,r->reader,r->writer,r->length,i->shmtoken); -#endif - if (shmlist_add(i->im.sender,SHM_SEND,0, - 0,r,i->shmsz)<0) { - gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n"); - send_nack(i->im.sender); - } else { - send_ack(i->im.sender); - } - } - } - break; - case LOWLEVELOP_SHM_FREE: - { - struct shmlistentry * sm; - struct shmid_ds sms; -#ifdef PRINTMSG - gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender); -#endif - if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) { -#ifdef PRINTMSG - gslog(LOG_EMERG,"freeing %u",sm->shmid); -#endif - shmdt((gs_sp_t )sm->buf); - shmctl(sm->shmid,IPC_RMID,&sms); - shmlist_rm(i->im.sender,SHM_SEND); - } - } - break; - case LOWLEVELOP_SHM_REMOTE_FREE: - { - struct shmlistentry * sm; - struct shmid_ds sms; -#ifdef PRINTMSG - gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender); -#endif - if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) { -#ifdef PRINTMSG - gslog(LOG_EMERG,"freeing %u",sm->shmid); -#endif - free((gs_sp_t ) sm->buf); - shmlist_rm(i->im.sender,SHM_SEND); - } - } - break; - case LOWLEVELOP_REMOTE_TUPLE: - { - if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) { -#ifdef PRINTMSG - gslog(LOG_EMERG,"Received remote ringbuf message " - "for message of size %u\n", - it->size,it->im.size); -#endif -#ifdef BLOCKRINGBUFFER - while (SPACETOWRITE(s->buf)==0) { - usleep(1000); - gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for."); - } - memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); - ADVANCEWRITE(s->buf); -#else - if (SPACETOWRITE(s->buf)) { - memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); - ADVANCEWRITE(s->buf); - } else { - // gslog(LOG_EMERG,"r+"); - } -#endif - } else { - gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n"); - } - } - break; - default: - gslog(LOG_EMERG,"hostlib::error::unexpected message received\n"); - return -1; - } - } - return 0; -} - -/* allocate a ringbuffer which allows receiving data from - * the other process. returns 0 if didn't succeed and - * returns an existing buffer if it exists */ -struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length) -{ - struct shmid_ds sms; - struct shmlistentry * se; - gs_int8_t keybuf[1024]; - key_t shmtoken=0; - gs_int32_t shmid=0; - struct ringbuf * r; - struct internal_message i; - gs_int32_t x; - - se = shmlist_find(f, SHM_RECV); - if (se) { - return se->buf; - } - - if (length<(4*MAXTUPLESZ)) { - gslog(LOG_EMERG, - "ERROR:buffersize in gscpipc_createshm has to be " - "at least %u Bytes long\n", - 4*MAXTUPLESZ); - return 0; - } - - if (myid.ip == f.ip) { - sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port); - if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1) { - gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n"); - return 0; - } - close(x); - - if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) { - gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n"); - return 0; - } - - if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL| - IPC_CREAT|IPC_EXCL))==-1) { - gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno); - return 0; - } - if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) { - gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n"); - return 0; - } - /* touch all memory once to map/reserve it now */ - for(x=0;xreader=0; - r->writer=0; - r->length=length; - r->end= r->length-MAXTUPLESZ; - r->destid=f; - r->srcid=myid; - i.im.receiver = f; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_REGISTER; - i.shmtoken = shmtoken; - i.shmsz=length; - if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) { - if (wait_for_lowlevel_ack()<0) { - shmdt((gs_sp_t )r); - shmctl(shmid,IPC_RMID,&sms); - return 0; - } - } - shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */ - if (shmlist_add(f, SHM_RECV, shmtoken, - shmid, r, length) <0) { - shmdt((gs_sp_t )r); - shmctl(shmid,IPC_RMID,&sms); - i.im.receiver = f; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_FREE; - i.shmtoken = shmtoken; - msg_send(f,(gs_sp_t )&i,i.im.size,1); - return 0; - } - } else { - /* remote shared memory */ - if ((r=(struct ringbuf *)malloc(length))==0) { - gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n"); - return 0; - } - /* touch all memory once to map/reserve it now */ - for(x=0;xreader=0; - r->writer=0; - r->length=length; - r->end= r->length-MAXTUPLESZ; - r->destid=f; - r->srcid=myid; - i.im.receiver = f; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER; - i.shmtoken = shmtoken; - i.shmsz=length; - if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) { - if (wait_for_lowlevel_ack()<0) { - free(r); - return 0; - } - } - if (shmlist_add(f, SHM_RECV, 0, - 0, r, length) <0) { - free(r); - i.im.receiver = f; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE; - msg_send(f,(gs_sp_t )&i,i.im.size,1); - return 0; - } - } - return r; - -} - -/* finds a ringbuffer to send which was allocated by - * gscpipc_creatshm and return 0 on an error */ - -struct ringbuf * gscpipc_getshm(FTAID f) -{ - struct shmlistentry * se; - gs_int32_t recmsgid; - se = shmlist_find(f, SHM_SEND); - if (se) { - return se->buf; - } - return 0; -} - -/* frees shared memory to a particular proccess identified - * by MSGID - */ -gs_retval_t gscpipc_freeshm(FTAID f) -{ - struct internal_message i; - struct shmlistentry * sm; - struct shmid_ds sms; - if (myid.ip == f.ip) { - if ((sm=shmlist_find(f, SHM_RECV)) <0) { - shmdt((gs_sp_t )sm->buf); - shmctl(sm->shmid,IPC_RMID,&sms); - i.im.receiver = f; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_FREE; - i.shmtoken = sm->shmtoken; - msg_send(f,(gs_sp_t )&i,i.im.size,1); - shmlist_rm(f, SHM_RECV); - } - } else { - if ((sm=shmlist_find(f, SHM_RECV)) <0) { - free((gs_sp_t )sm->buf); - i.im.receiver = f; - i.im.sender = myid; - i.im.operation = RESERVED_FOR_LOW_LEVEL; - i.im.size = sizeof(struct internal_message); - i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE; - msg_send(f,(gs_sp_t )&i,i.im.size,1); - shmlist_rm(f, SHM_RECV); - } - } - return 0; -} - -gs_retval_t gscpipc_mqhint() -{ - gs_int32_t x; - for (x=0; xmqhint) { - return 1; - } - } - } - return 0; -} +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#include "gsconfig.h" +#include "gstypes.h" +#include "gscpipc.h" +#include "gshub.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __linux__ +#include +#include +#include +#endif + +#ifndef socklen_t +#define socklen_t gs_uint32_t +#endif + + +struct FTAID clearinghouseftaid; + + + +struct connection { + gs_int32_t socket; /* socket for connection */ + gs_int32_t used; /* 1 if the entry is in use */ + FTAID remoteid; /* remoteid of connection */ +}; + +struct connection connectionhash[SOCKET_HASH_SZ]; + +gs_int32_t clearinghouse=0; + +#define SHMTYPE 's' +#define SHM_RECV 1 +#define SHM_SEND 2 + +struct shmlistentry { + FTAID msgid; + gs_int32_t type; + key_t shmtoken; + gs_int32_t shmid; + struct ringbuf * buf; + gs_int32_t buffsize; +}; + +gs_int32_t shmlistlen=0; +struct shmlistentry shmlist[MAX_NUMBER_OF_SHM]; + +struct sq { + gs_int8_t buf[MAXMSGSZ]; + gs_int32_t length; + struct sq * next; +}; + +static struct sq * sqtop=0; +static struct sq * sqtail=0; + +/* adds a buffer to the end of the sidequeue*/ +gs_retval_t gscpipc_sidequeue_append(gs_sp_t buf, gs_int32_t length) +{ + struct sq * s; + if ((s=malloc(sizeof(struct sq)))==0) { + gslog(LOG_EMERG,"Could not allocate memory for sidequeue"); + return -1; + } + memcpy(&s->buf[0],buf,MAXMSGSZ); + s->length=length; + s->next=0; + if (sqtail) { + sqtail->next=s; + sqtail=s; + } else { + sqtop = s; + sqtail = s; + } + return 0; +} + +/* removes a buffer from the top of the sidequeue*/ +gs_retval_t gscpipc_sidequeue_pop(gs_sp_t buf, gs_int32_t * length, gs_int32_t buflen) +{ + struct sq * s; + if (sqtop) { + if (sqtop->length > buflen) { + return -2; + } + memcpy(buf,&sqtop->buf[0],sqtop->length); + *length=sqtop->length; + s=sqtop; + sqtop=sqtop->next; + if (sqtop==0) sqtail=0; + free(s); + return 0; + } + return -1; +} + +struct ipc_message { + FTAID receiver; + FTAID sender; + gs_int32_t operation; +#ifdef PRINTMSG + gs_int32_t pmsgid; +#endif + gs_int32_t size; + gs_int8_t data[4]; +}; + +#define LOWLEVELOP_ACK 0 +#define LOWLEVELOP_NACK 1 +#define LOWLEVELOP_REGISTER 2 +#define LOWLEVELOP_UNREGISTER 3 +#define LOWLEVELOP_SHM_REGISTER 4 +#define LOWLEVELOP_SHM_FREE 5 +#define LOWLEVELOP_SHM_REMOTE_REGISTER 6 +#define LOWLEVELOP_SHM_REMOTE_FREE 7 +#define LOWLEVELOP_REMOTE_TUPLE 8 + +struct internal_message{ + struct ipc_message im; + int lowlevelop; + key_t shmtoken; + int shmsz; + int result; +}; + +struct internal_remote_tuple{ + struct ipc_message im; + int lowlevelop; + int size; + gs_int8_t data[4]; +}; + +FTAID myid; +int listen_socket; + +#ifdef PRINTMSG +int pmsgid=0; +#endif + + +struct shmlistentry * shmlist_find(FTAID msgid, int type) +{ + int x; + for (x=0; x=MAX_NUMBER_OF_SHM) { + gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many" + "shm registered\n"); + return -1; + } + shmlist[shmlistlen].msgid=msgid; + shmlist[shmlistlen].type=type; + shmlist[shmlistlen].shmtoken=shmtoken; + shmlist[shmlistlen].shmid=shmid; + shmlist[shmlistlen].buf=buf; + shmlist[shmlistlen].buffsize=buffsize; + shmlistlen++; + return 0; +} + +static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type) +{ + gs_int32_t x; + gs_int32_t move=0; + for (x=0;x0) { + if ((rv=write(socket,buf,sz))<0) { + if (errno == EINTR) + continue; + else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS + return 0; + else + return -1; + } + sz-=rv; + buf+=rv; + } + return res; +} + +static gs_retval_t msg_send(FTAID id, gs_sp_t buf, gs_uint32_t len, gs_uint32_t block) { + struct sockaddr_in sin; + gs_int32_t x; + gs_int32_t u; + gs_int32_t sz; + gs_int32_t ret; + +try_send_again: + u=-1; + for(x=0;x=SOCKET_HASH_SZ) || (u<0)) { + gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum" + " TCP connection limit sending %d\n",u); + return -1; + } + connectionhash[u].remoteid=id; + connectionhash[u].remoteid.index=0; + connectionhash[u].remoteid.streamid=0; + sin.sin_family = AF_INET; +#ifdef __NetBSD__ + sin.sin_len = sizeof(sin); +#endif + sin.sin_addr.s_addr = htonl(id.ip); + sin.sin_port = htons(id.port); + if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n"); + return -1; + } + if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n"); + return -1; + } + sz=SOCK_BUF_SZ; + if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF, + (gs_sp_t )&sz, sizeof(sz)) != 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n"); + return -1; + } + sz=SOCK_BUF_SZ; + if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF, + (gs_sp_t )&sz, sizeof(sz)) != 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n"); + return -1; + } +#ifndef __linux__ + // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead + sz=MAXSZ+4; + if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT, + (gs_sp_t )&sz, sizeof(sz)) != 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno); + return -1; + } +#endif + sz=1; + if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE, + (gs_sp_t )&sz, sizeof(sz)) != 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n"); + return -1; + } + connectionhash[u].used=1; + goto try_send_again; + return -1; +} + +static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) { + gs_int32_t rv; + gs_sp_t buf = (gs_sp_t )b; + gs_int32_t res=sz; + while(sz>0) { + if ((rv=read(socket,buf,sz))<0) { + if (errno == EINTR) + continue; + gslog(LOG_EMERG,"read with error number %u \n",errno); + return -1; + } + if (rv==0) { + return 0; + } + sz-=rv; + buf+=rv; + } + return res; +} + +/* msg_recv return len if data is available -1 for a timeout and -2 for an error */ +static gs_retval_t msg_recv(gs_sp_t buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) { + struct sockaddr_in sin; + socklen_t sl; + fd_set fs; + gs_int32_t x,y; + gs_int32_t n; + gs_int32_t length; + struct timeval tv; + gs_int32_t sret; + static gs_int32_t last=0; + gs_uint32_t sz; + + if (check_sideque==1) { + if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) { + return length; + } + if (x==-2) { + gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n"); + return -2; + } + } + +read_again: + if (block==0) { + tv.tv_sec = 0; + tv.tv_usec = 0; + } else { + tv.tv_sec = 0; + tv.tv_usec = 100000; + } + FD_ZERO(&fs); + FD_SET(listen_socket,&fs); + n=listen_socket; + for(x=0;x=SOCKET_HASH_SZ) { + gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum" + "TCP connection limit accepting\n"); + goto read_again; + } + sl=sizeof(sin); + if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl)) + < 0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n"); + goto read_again; + } + + sz=SOCK_BUF_SZ; + if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF, + (gs_sp_t )&sz, sizeof(sz)) != 0) { + fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n"); + return -1; + } + sz=SOCK_BUF_SZ; + if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF, + (gs_sp_t )&sz, sizeof(sz)) != 0) { + fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n"); + return -1; + } + + sl=sizeof(sin); + if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) { + gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n"); + close(connectionhash[x].socket); + goto read_again; + } + connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr); + connectionhash[x].remoteid.port=ntohs(sin.sin_port); + connectionhash[x].remoteid.index=0; + connectionhash[x].remoteid.streamid=0; + connectionhash[x].used=1; +#ifdef PRINTMSG + gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port); +#endif + goto read_again; + } + for(x=0;x0) { + if ((i->im.operation == RESERVED_FOR_LOW_LEVEL) + && (( i->lowlevelop == LOWLEVELOP_ACK) + || ( i->lowlevelop == LOWLEVELOP_NACK))) { + if ( i->lowlevelop == LOWLEVELOP_ACK) { + return 0; + } else { + return 1; + } + } + if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) { + struct internal_remote_tuple * it; + struct shmlistentry * s; + it=(struct internal_remote_tuple *)b; + + if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) { +#ifdef PRINTMSG + gslog(LOG_EMERG,"Received remote ringbuf message " + "for message of size %u\n", + it->size,it->im.size); +#endif +#ifdef BLOCKRINGBUFFER + while (SPACETOWRITE(s->buf)==0) { + usleep(1000); + gslog(LOG_ERR,"Dead in the water we can't " + "drain the ringbuffer we wait for."); + } + memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); + ADVANCEWRITE(s->buf); +#else + if (SPACETOWRITE(s->buf)) { + memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); + ADVANCEWRITE(s->buf); + } else { + // gslog(LOG_EMERG,"r+"); + } +#endif + } else { + gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n"); + } + } else { + gscpipc_sidequeue_append(b, res); + } + } else { + if (res < -1) { + /* got an error here */ + gslog(LOG_EMERG,"hostlib::error::received error " + "during wait for low level ack\n"); + return -1; + } + } + } + /* never reached */ + return -1; +} + +void shmlist_drain_remote() +{ + gs_int32_t x; + gs_int8_t buf[MAXSZ]; + struct internal_remote_tuple * it; + it = (struct internal_remote_tuple *) buf; + for (x=0; xim.receiver = shmlist[x].msgid; + it->im.sender = myid; + it->im.operation = RESERVED_FOR_LOW_LEVEL; + it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE; + it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1; + it->im.size = sizeof(struct internal_remote_tuple)-4+it->size; + memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size); +#ifdef PRINTMSG + gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n", + it->size, it->im.size); +#endif + if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) { + break; + } + ADVANCEREAD(shmlist[x].buf); + } + } + } +} + +/* + *used to contact the clearinghouse process returns the MSGID of + * the current process + */ +gs_retval_t gscpipc_init(gs_int32_t clearinghouse) +{ + struct internal_message i; + key_t msqtoken=0; + gs_int32_t x; + endpoint gshub; + endpoint tmpclearinghouse; + + /* make sure priveleges can be set */ + umask(0); + + clearinghouseftaid.index=0; + clearinghouseftaid.streamid=0; + + if (get_hub(&gshub)!=0) { + gslog(LOG_EMERG,"hostlib::error::could get hub\n"); + return -1; + } + + if (clearinghouse!=0) { + // This is the clearinghouse + gs_int8_t buf[MAXMSGSZ]; + + if (msg_init(1)<0) { + gslog(LOG_EMERG,"hostlib::error::could not init msgq\n"); + return -1; + } + + clearinghouseftaid.ip=myid.ip; + clearinghouseftaid.port=myid.port; + + tmpclearinghouse.ip=htonl(clearinghouseftaid.ip); + tmpclearinghouse.port=htons(clearinghouseftaid.port); + + if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) { + gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance"); + return -1; + } + + return 0; + + } else { + // This is an lfta/hfta/app + gs_int32_t res; + + if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) { + gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n"); + return -1; + } + + clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip); + clearinghouseftaid.port=ntohs(tmpclearinghouse.port); + + + if (msg_init(0)<0) { + gslog(LOG_EMERG,"hostlib::error::could not init msgq\n"); + return -1; + } + + i.im.receiver = clearinghouseftaid; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_REGISTER; +#ifdef PRINTMSG + i.im.pmsgid=pmsgid; + pmsgid++; + gslog(LOG_EMERG,"send a message (%d.%u) to %u with op " + "%u with size %u\n", + i.im.pmsgid,i.im.receiver.port,i.im.operation, + i.im.size); +#endif + if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) { + /* we can wait her for an ack since nobody should know + about us yet */ + init_read_again: + if ((res=msg_recv((gs_sp_t )&i, + sizeof(struct internal_message), + 1,0))>=0) { + if (i.lowlevelop == LOWLEVELOP_ACK) { + return 0; + } else { + gslog(LOG_EMERG,"hostlib::error::received unexpected message " + "during initalization\n"); + return -1; + } + } else { + if (res<-1) { + /* got an error here */ + gslog(LOG_EMERG,"hostlib::error::received error message " + "during initalization\n"); + return -1; + } else { + goto init_read_again; + } + } + } + gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n"); + return -1; + } + return 0; +} + + +static gs_retval_t gscpdetachshm(FTAID target) +{ + struct internal_message i; + + i.im.receiver = target; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_FREE; + if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) { + /* no reason to wait here won't be acked anyway */ + return -1; + } + return 0; +} + +static gs_retval_t gscpdetachsocket(FTAID target) +{ + struct internal_message i; + + i.im.receiver = target; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_UNREGISTER; + if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) { + /* no reason to wait here won't be acked anyway */ + return -1; + } + return -1; +} + + +/* used to disassociate process from clearinghouse */ +gs_retval_t gscpipc_free() +{ + gs_int32_t x; + /* XXX OS if this function is called when there are still + subscribed FTAs for this process the clearinghouse will + crash + */ + + if (clearinghouse!=0) { + return 0; + } else { + gs_int32_t x; + for (x=0; x MAXMSGSZ) { + gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n"); + return -1; + } + i = (struct ipc_message *) b; + i->receiver=f; + i->sender=myid; + i->operation=operation; + i->size=length+sizeof(struct ipc_message); + memcpy(&i->data[0],buf,length); +#ifdef PRINTMSG + i->pmsgid=pmsgid; + pmsgid++; + gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n", + i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length); +#endif + if ((s=shmlist_find(f, SHM_RECV))!=0) { + // set the hint in the ringbuffer that there is something on the shared memory queue + s->buf->mqhint=1; + } + if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) { + gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno); + return -1; + } + return 0; +} + +/* retrieve a message buf has to be at least of size MAXMSGSZ*/ +gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t buf, gs_int32_t * size, gs_int32_t block) +{ + gs_int32_t w; + gs_int32_t x; + gs_int8_t b[MAXSZ]; + gs_int32_t y; + + struct internal_message * i; + struct internal_remote_tuple * it; + gs_int32_t length; + + struct shmlistentry * s; + + i=(struct internal_message *)b; + it=(struct internal_remote_tuple *)b; + + for(y=0;(y < 10) || (block==1);y++) { + shmlist_drain_remote(); + length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1); + if (length < -1) { + /* problem */ + return -1; + } + if (length < 0) { + /* we are nonblocking and have nothing to do */ + if (block==1) { + // we are expected to block for ever if it is 0 or 2 we return + continue; + } + return 0; + } +#ifdef PRINTMSG + gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n", + i->im.pmsgid, i->im.sender, i->im.sender,i->im.operation,i->im.size); +#endif + if (length <0) { + gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno); + return -1; + } + if (i->im.operation != RESERVED_FOR_LOW_LEVEL) { + memcpy(buf,&(i->im.data[0]),i->im.size); + *size=i->im.size-sizeof(struct ipc_message); + *operation=i->im.operation; + *f=i->im.sender; + if ((s=shmlist_find(*f, SHM_SEND))!=0) { + // clear the hint in the ringbuffer to indicate we got the message + s->buf->mqhint=0; + } + return 1; + } + switch (i->lowlevelop) { + case LOWLEVELOP_REGISTER: + /* this should only get called if the process is the clearinghouse */ +#ifdef PRINTMSG + gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port); +#endif + send_ack(i->im.sender); + break; + case LOWLEVELOP_UNREGISTER: +#ifdef PRINTMSG + gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port); +#endif /* remove connection */ + for(x=0;xim.sender.ip) + && (connectionhash[x].remoteid.port==i->im.sender.port)) { + gslog(LOG_EMERG,"Close by remote request %u\n", + connectionhash[x].remoteid.port); + // XXX closed when the other process dies + // can't close it yet since we might have + // some more messages + // close(connectionhash[x].socket); + connectionhash[x].used=0; + } + } + break; + case LOWLEVELOP_SHM_REGISTER: + { + gs_int32_t shmid; + struct ringbuf * r; + struct shmid_ds sms; +#ifdef PRINTMSG + gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n", + i->im.sender.port, + i->shmtoken,i->shmsz); +#endif + if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) { + if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) { + gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno); + send_nack(i->im.sender); + } else { + // Make sure all the momory gets mapped now + for(x=0;xreader,r->writer,r->length,i->shmtoken); +#endif + if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken, + shmid,r,i->shmsz)<0) { + shmdt((gs_sp_t )r); + shmctl(shmid,IPC_RMID,&sms); + gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n"); + send_nack(i->im.sender); + } else { + send_ack(i->im.sender); + } + } + } else { + gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno); + send_nack(i->im.sender); + } + } + break; + case LOWLEVELOP_SHM_REMOTE_REGISTER: + { + gs_int32_t shmid; + struct ringbuf * r; + struct shmid_ds sms; +#ifdef PRINTMSG + gslog(LOG_EMERG,"request to get remote shm %u size %u\n", + i->im.sender.port,i->shmsz); +#endif + if ((r=(struct ringbuf *)malloc(i->shmsz))==0) { + gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno); + send_nack(i->im.sender); + } else { + // make sure all the memory gets mapped now + for(x=0;xreader=0; + r->writer=0; + r->length=i->shmsz; + r->end= i->shmsz-MAXTUPLESZ; + r->destid=i->im.receiver; + r->srcid=i->im.sender; +#ifdef PRINTMSG + gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n" + ,(void *)r,r->reader,r->writer,r->length,i->shmtoken); +#endif + if (shmlist_add(i->im.sender,SHM_SEND,0, + 0,r,i->shmsz)<0) { + gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n"); + send_nack(i->im.sender); + } else { + send_ack(i->im.sender); + } + } + } + break; + case LOWLEVELOP_SHM_FREE: + { + struct shmlistentry * sm; + struct shmid_ds sms; +#ifdef PRINTMSG + gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender); +#endif + if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) { +#ifdef PRINTMSG + gslog(LOG_EMERG,"freeing %u",sm->shmid); +#endif + shmdt((gs_sp_t )sm->buf); + shmctl(sm->shmid,IPC_RMID,&sms); + shmlist_rm(i->im.sender,SHM_SEND); + } + } + break; + case LOWLEVELOP_SHM_REMOTE_FREE: + { + struct shmlistentry * sm; + struct shmid_ds sms; +#ifdef PRINTMSG + gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender); +#endif + if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) { +#ifdef PRINTMSG + gslog(LOG_EMERG,"freeing %u",sm->shmid); +#endif + free((gs_sp_t ) sm->buf); + shmlist_rm(i->im.sender,SHM_SEND); + } + } + break; + case LOWLEVELOP_REMOTE_TUPLE: + { + if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) { +#ifdef PRINTMSG + gslog(LOG_EMERG,"Received remote ringbuf message " + "for message of size %u\n", + it->size,it->im.size); +#endif +#ifdef BLOCKRINGBUFFER + while (SPACETOWRITE(s->buf)==0) { + usleep(1000); + gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for."); + } + memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); + ADVANCEWRITE(s->buf); +#else + if (SPACETOWRITE(s->buf)) { + memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); + ADVANCEWRITE(s->buf); + } else { + // gslog(LOG_EMERG,"r+"); + } +#endif + } else { + gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n"); + } + } + break; + default: + gslog(LOG_EMERG,"hostlib::error::unexpected message received\n"); + return -1; + } + } + return 0; +} + +/* allocate a ringbuffer which allows receiving data from + * the other process. returns 0 if didn't succeed and + * returns an existing buffer if it exists */ +struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length) +{ + struct shmid_ds sms; + struct shmlistentry * se; + gs_int8_t keybuf[1024]; + key_t shmtoken=0; + gs_int32_t shmid=0; + struct ringbuf * r; + struct internal_message i; + gs_int32_t x; + + se = shmlist_find(f, SHM_RECV); + if (se) { + return se->buf; + } + + if (length<(4*MAXTUPLESZ)) { + gslog(LOG_EMERG, + "ERROR:buffersize in gscpipc_createshm has to be " + "at least %u Bytes long\n", + 4*MAXTUPLESZ); + return 0; + } + + if (myid.ip == f.ip) { + sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port); + if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1) { + gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n"); + return 0; + } + close(x); + + if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) { + gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n"); + return 0; + } + + if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL| + IPC_CREAT|IPC_EXCL))==-1) { + gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno); + return 0; + } + if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) { + gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n"); + return 0; + } + /* touch all memory once to map/reserve it now */ + for(x=0;xreader=0; + r->writer=0; + r->length=length; + r->end= r->length-MAXTUPLESZ; + r->destid=f; + r->srcid=myid; + i.im.receiver = f; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_REGISTER; + i.shmtoken = shmtoken; + i.shmsz=length; + if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) { + if (wait_for_lowlevel_ack()<0) { + shmdt((gs_sp_t )r); + shmctl(shmid,IPC_RMID,&sms); + return 0; + } + } + shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */ + if (shmlist_add(f, SHM_RECV, shmtoken, + shmid, r, length) <0) { + shmdt((gs_sp_t )r); + shmctl(shmid,IPC_RMID,&sms); + i.im.receiver = f; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_FREE; + i.shmtoken = shmtoken; + msg_send(f,(gs_sp_t )&i,i.im.size,1); + return 0; + } + } else { + /* remote shared memory */ + if ((r=(struct ringbuf *)malloc(length))==0) { + gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n"); + return 0; + } + /* touch all memory once to map/reserve it now */ + for(x=0;xreader=0; + r->writer=0; + r->length=length; + r->end= r->length-MAXTUPLESZ; + r->destid=f; + r->srcid=myid; + i.im.receiver = f; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER; + i.shmtoken = shmtoken; + i.shmsz=length; + if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) { + if (wait_for_lowlevel_ack()<0) { + free(r); + return 0; + } + } + if (shmlist_add(f, SHM_RECV, 0, + 0, r, length) <0) { + free(r); + i.im.receiver = f; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE; + msg_send(f,(gs_sp_t )&i,i.im.size,1); + return 0; + } + } + return r; + +} + +/* finds a ringbuffer to send which was allocated by + * gscpipc_creatshm and return 0 on an error */ + +struct ringbuf * gscpipc_getshm(FTAID f) +{ + struct shmlistentry * se; + gs_int32_t recmsgid; + se = shmlist_find(f, SHM_SEND); + if (se) { + return se->buf; + } + return 0; +} + +/* frees shared memory to a particular proccess identified + * by MSGID + */ +gs_retval_t gscpipc_freeshm(FTAID f) +{ + struct internal_message i; + struct shmlistentry * sm; + struct shmid_ds sms; + if (myid.ip == f.ip) { + if ((sm=shmlist_find(f, SHM_RECV)) <0) { + shmdt((gs_sp_t )sm->buf); + shmctl(sm->shmid,IPC_RMID,&sms); + i.im.receiver = f; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_FREE; + i.shmtoken = sm->shmtoken; + msg_send(f,(gs_sp_t )&i,i.im.size,1); + shmlist_rm(f, SHM_RECV); + } + } else { + if ((sm=shmlist_find(f, SHM_RECV)) <0) { + free((gs_sp_t )sm->buf); + i.im.receiver = f; + i.im.sender = myid; + i.im.operation = RESERVED_FOR_LOW_LEVEL; + i.im.size = sizeof(struct internal_message); + i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE; + msg_send(f,(gs_sp_t )&i,i.im.size,1); + shmlist_rm(f, SHM_RECV); + } + } + return 0; +} + +gs_retval_t gscpipc_mqhint() +{ + gs_int32_t x; + for (x=0; xmqhint) { + return 1; + } + } + } + return 0; +}