/* ------------------------------------------------ Copyright 2014 AT&T Intellectual Property Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ------------------------------------------- */ #include "gsconfig.h" #include "gstypes.h" #include "gscpipc.h" #include "gshub.h" #include #include #include #include #include #include #include #include #include #include #ifdef __linux__ #include #include #include #endif #ifndef socklen_t #define socklen_t gs_uint32_t #endif struct FTAID clearinghouseftaid; struct connection { gs_int32_t socket; /* socket for connection */ gs_int32_t used; /* 1 if the entry is in use */ FTAID remoteid; /* remoteid of connection */ }; struct connection connectionhash[SOCKET_HASH_SZ]; gs_int32_t clearinghouse=0; #define SHMTYPE 's' #define SHM_RECV 1 #define SHM_SEND 2 struct shmlistentry { FTAID msgid; gs_int32_t type; key_t shmtoken; gs_int32_t shmid; struct ringbuf * buf; gs_int32_t buffsize; }; gs_int32_t shmlistlen=0; struct shmlistentry shmlist[MAX_NUMBER_OF_SHM]; struct sq { gs_int8_t buf[MAXMSGSZ]; gs_int32_t length; struct sq * next; }; static struct sq * sqtop=0; static struct sq * sqtail=0; /* adds a buffer to the end of the sidequeue*/ gs_retval_t gscpipc_sidequeue_append(gs_sp_t buf, gs_int32_t length) { struct sq * s; if ((s=malloc(sizeof(struct sq)))==0) { gslog(LOG_EMERG,"Could not allocate memory for sidequeue"); return -1; } memcpy(&s->buf[0],buf,MAXMSGSZ); s->length=length; s->next=0; if (sqtail) { sqtail->next=s; sqtail=s; } else { sqtop = s; sqtail = s; } return 0; } /* removes a buffer from the top of the sidequeue*/ gs_retval_t gscpipc_sidequeue_pop(gs_sp_t buf, gs_int32_t * length, gs_int32_t buflen) { struct sq * s; if (sqtop) { if (sqtop->length > buflen) { return -2; } memcpy(buf,&sqtop->buf[0],sqtop->length); *length=sqtop->length; s=sqtop; sqtop=sqtop->next; if (sqtop==0) sqtail=0; free(s); return 0; } return -1; } struct ipc_message { FTAID receiver; FTAID sender; gs_int32_t operation; #ifdef PRINTMSG gs_int32_t pmsgid; #endif gs_int32_t size; gs_int8_t data[4]; }; #define LOWLEVELOP_ACK 0 #define LOWLEVELOP_NACK 1 #define LOWLEVELOP_REGISTER 2 #define LOWLEVELOP_UNREGISTER 3 #define LOWLEVELOP_SHM_REGISTER 4 #define LOWLEVELOP_SHM_FREE 5 #define LOWLEVELOP_SHM_REMOTE_REGISTER 6 #define LOWLEVELOP_SHM_REMOTE_FREE 7 #define LOWLEVELOP_REMOTE_TUPLE 8 struct internal_message{ struct ipc_message im; int lowlevelop; key_t shmtoken; int shmsz; int result; }; struct internal_remote_tuple{ struct ipc_message im; int lowlevelop; int size; gs_int8_t data[4]; }; FTAID myid; int listen_socket; #ifdef PRINTMSG int pmsgid=0; #endif struct shmlistentry * shmlist_find(FTAID msgid, int type) { int x; for (x=0; x=MAX_NUMBER_OF_SHM) { gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many" "shm registered\n"); return -1; } shmlist[shmlistlen].msgid=msgid; shmlist[shmlistlen].type=type; shmlist[shmlistlen].shmtoken=shmtoken; shmlist[shmlistlen].shmid=shmid; shmlist[shmlistlen].buf=buf; shmlist[shmlistlen].buffsize=buffsize; shmlistlen++; return 0; } static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type) { gs_int32_t x; gs_int32_t move=0; for (x=0;x0) { if ((rv=write(socket,buf,sz))<0) { if (errno == EINTR) continue; else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS return 0; else return -1; } sz-=rv; buf+=rv; } return res; } static gs_retval_t msg_send(FTAID id, gs_sp_t buf, gs_uint32_t len, gs_uint32_t block) { struct sockaddr_in sin; gs_int32_t x; gs_int32_t u; gs_int32_t sz; gs_int32_t ret; try_send_again: u=-1; for(x=0;x=SOCKET_HASH_SZ) || (u<0)) { gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum" " TCP connection limit sending %d\n",u); return -1; } connectionhash[u].remoteid=id; connectionhash[u].remoteid.index=0; connectionhash[u].remoteid.streamid=0; sin.sin_family = AF_INET; #ifdef __NetBSD__ sin.sin_len = sizeof(sin); #endif sin.sin_addr.s_addr = htonl(id.ip); sin.sin_port = htons(id.port); if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n"); return -1; } if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n"); return -1; } sz=SOCK_BUF_SZ; if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF, (gs_sp_t )&sz, sizeof(sz)) != 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n"); return -1; } sz=SOCK_BUF_SZ; if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF, (gs_sp_t )&sz, sizeof(sz)) != 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n"); return -1; } #ifndef __linux__ // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead sz=MAXSZ+4; if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT, (gs_sp_t )&sz, sizeof(sz)) != 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno); return -1; } #endif sz=1; if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE, (gs_sp_t )&sz, sizeof(sz)) != 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n"); return -1; } connectionhash[u].used=1; goto try_send_again; return -1; } static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) { gs_int32_t rv; gs_sp_t buf = (gs_sp_t )b; gs_int32_t res=sz; while(sz>0) { if ((rv=read(socket,buf,sz))<0) { if (errno == EINTR) continue; gslog(LOG_EMERG,"read with error number %u \n",errno); return -1; } if (rv==0) { return 0; } sz-=rv; buf+=rv; } return res; } /* msg_recv return len if data is available -1 for a timeout and -2 for an error */ static gs_retval_t msg_recv(gs_sp_t buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) { struct sockaddr_in sin; socklen_t sl; fd_set fs; gs_int32_t x,y; gs_int32_t n; gs_int32_t length; struct timeval tv; gs_int32_t sret; static gs_int32_t last=0; gs_uint32_t sz; if (check_sideque==1) { if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) { return length; } if (x==-2) { gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n"); return -2; } } read_again: if (block==0) { tv.tv_sec = 0; tv.tv_usec = 0; } else { tv.tv_sec = 0; tv.tv_usec = 100000; } FD_ZERO(&fs); FD_SET(listen_socket,&fs); n=listen_socket; for(x=0;x=SOCKET_HASH_SZ) { gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum" "TCP connection limit accepting\n"); goto read_again; } sl=sizeof(sin); if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl)) < 0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n"); goto read_again; } sz=SOCK_BUF_SZ; if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF, (gs_sp_t )&sz, sizeof(sz)) != 0) { fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n"); return -1; } sz=SOCK_BUF_SZ; if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF, (gs_sp_t )&sz, sizeof(sz)) != 0) { fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n"); return -1; } sl=sizeof(sin); if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) { gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n"); close(connectionhash[x].socket); goto read_again; } connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr); connectionhash[x].remoteid.port=ntohs(sin.sin_port); connectionhash[x].remoteid.index=0; connectionhash[x].remoteid.streamid=0; connectionhash[x].used=1; #ifdef PRINTMSG gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port); #endif goto read_again; } for(x=0;x0) { if ((i->im.operation == RESERVED_FOR_LOW_LEVEL) && (( i->lowlevelop == LOWLEVELOP_ACK) || ( i->lowlevelop == LOWLEVELOP_NACK))) { if ( i->lowlevelop == LOWLEVELOP_ACK) { return 0; } else { return 1; } } if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) { struct internal_remote_tuple * it; struct shmlistentry * s; it=(struct internal_remote_tuple *)b; if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) { #ifdef PRINTMSG gslog(LOG_EMERG,"Received remote ringbuf message " "for message of size %u\n", it->size,it->im.size); #endif #ifdef BLOCKRINGBUFFER while (SPACETOWRITE(s->buf)==0) { usleep(1000); gslog(LOG_ERR,"Dead in the water we can't " "drain the ringbuffer we wait for."); } memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); ADVANCEWRITE(s->buf); #else if (SPACETOWRITE(s->buf)) { memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); ADVANCEWRITE(s->buf); } else { // gslog(LOG_EMERG,"r+"); } #endif } else { gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n"); } } else { gscpipc_sidequeue_append(b, res); } } else { if (res < -1) { /* got an error here */ gslog(LOG_EMERG,"hostlib::error::received error " "during wait for low level ack\n"); return -1; } } } /* never reached */ return -1; } void shmlist_drain_remote() { gs_int32_t x; gs_int8_t buf[MAXSZ]; struct internal_remote_tuple * it; it = (struct internal_remote_tuple *) buf; for (x=0; xim.receiver = shmlist[x].msgid; it->im.sender = myid; it->im.operation = RESERVED_FOR_LOW_LEVEL; it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE; it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1; it->im.size = sizeof(struct internal_remote_tuple)-4+it->size; memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size); #ifdef PRINTMSG gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n", it->size, it->im.size); #endif if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) { break; } ADVANCEREAD(shmlist[x].buf); } } } } /* *used to contact the clearinghouse process returns the MSGID of * the current process */ gs_retval_t gscpipc_init(gs_int32_t clearinghouse) { struct internal_message i; key_t msqtoken=0; gs_int32_t x; endpoint gshub; endpoint tmpclearinghouse; /* make sure priveleges can be set */ umask(0); clearinghouseftaid.index=0; clearinghouseftaid.streamid=0; if (get_hub(&gshub)!=0) { gslog(LOG_EMERG,"hostlib::error::could get hub\n"); return -1; } if (clearinghouse!=0) { // This is the clearinghouse gs_int8_t buf[MAXMSGSZ]; if (msg_init(1)<0) { gslog(LOG_EMERG,"hostlib::error::could not init msgq\n"); return -1; } clearinghouseftaid.ip=myid.ip; clearinghouseftaid.port=myid.port; tmpclearinghouse.ip=htonl(clearinghouseftaid.ip); tmpclearinghouse.port=htons(clearinghouseftaid.port); if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) { gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance"); return -1; } return 0; } else { // This is an lfta/hfta/app gs_int32_t res; if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) { gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n"); return -1; } clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip); clearinghouseftaid.port=ntohs(tmpclearinghouse.port); if (msg_init(0)<0) { gslog(LOG_EMERG,"hostlib::error::could not init msgq\n"); return -1; } i.im.receiver = clearinghouseftaid; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_REGISTER; #ifdef PRINTMSG i.im.pmsgid=pmsgid; pmsgid++; gslog(LOG_EMERG,"send a message (%d.%u) to %u with op " "%u with size %u\n", i.im.pmsgid,i.im.receiver.port,i.im.operation, i.im.size); #endif if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) { /* we can wait her for an ack since nobody should know about us yet */ init_read_again: if ((res=msg_recv((gs_sp_t )&i, sizeof(struct internal_message), 1,0))>=0) { if (i.lowlevelop == LOWLEVELOP_ACK) { return 0; } else { gslog(LOG_EMERG,"hostlib::error::received unexpected message " "during initalization\n"); return -1; } } else { if (res<-1) { /* got an error here */ gslog(LOG_EMERG,"hostlib::error::received error message " "during initalization\n"); return -1; } else { goto init_read_again; } } } gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n"); return -1; } return 0; } static gs_retval_t gscpdetachshm(FTAID target) { struct internal_message i; i.im.receiver = target; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_FREE; if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) { /* no reason to wait here won't be acked anyway */ return -1; } return 0; } static gs_retval_t gscpdetachsocket(FTAID target) { struct internal_message i; i.im.receiver = target; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_UNREGISTER; if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) { /* no reason to wait here won't be acked anyway */ return -1; } return -1; } /* used to disassociate process from clearinghouse */ gs_retval_t gscpipc_free() { gs_int32_t x; /* XXX OS if this function is called when there are still subscribed FTAs for this process the clearinghouse will crash */ if (clearinghouse!=0) { return 0; } else { gs_int32_t x; for (x=0; x MAXMSGSZ) { gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n"); return -1; } i = (struct ipc_message *) b; i->receiver=f; i->sender=myid; i->operation=operation; i->size=length+sizeof(struct ipc_message); memcpy(&i->data[0],buf,length); #ifdef PRINTMSG i->pmsgid=pmsgid; pmsgid++; gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n", i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length); #endif if ((s=shmlist_find(f, SHM_RECV))!=0) { // set the hint in the ringbuffer that there is something on the shared memory queue s->buf->mqhint=1; } if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) { gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno); return -1; } return 0; } /* retrieve a message buf has to be at least of size MAXMSGSZ*/ gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t buf, gs_int32_t * size, gs_int32_t block) { gs_int32_t w; gs_int32_t x; gs_int8_t b[MAXSZ]; gs_int32_t y; struct internal_message * i; struct internal_remote_tuple * it; gs_int32_t length; struct shmlistentry * s; i=(struct internal_message *)b; it=(struct internal_remote_tuple *)b; for(y=0;(y < 10) || (block==1);y++) { shmlist_drain_remote(); length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1); if (length < -1) { /* problem */ return -1; } if (length < 0) { /* we are nonblocking and have nothing to do */ if (block==1) { // we are expected to block for ever if it is 0 or 2 we return continue; } return 0; } #ifdef PRINTMSG gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n", i->im.pmsgid, i->im.sender, i->im.sender,i->im.operation,i->im.size); #endif if (length <0) { gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno); return -1; } if (i->im.operation != RESERVED_FOR_LOW_LEVEL) { memcpy(buf,&(i->im.data[0]),i->im.size); *size=i->im.size-sizeof(struct ipc_message); *operation=i->im.operation; *f=i->im.sender; if ((s=shmlist_find(*f, SHM_SEND))!=0) { // clear the hint in the ringbuffer to indicate we got the message s->buf->mqhint=0; } return 1; } switch (i->lowlevelop) { case LOWLEVELOP_REGISTER: /* this should only get called if the process is the clearinghouse */ #ifdef PRINTMSG gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port); #endif send_ack(i->im.sender); break; case LOWLEVELOP_UNREGISTER: #ifdef PRINTMSG gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port); #endif /* remove connection */ for(x=0;xim.sender.ip) && (connectionhash[x].remoteid.port==i->im.sender.port)) { gslog(LOG_EMERG,"Close by remote request %u\n", connectionhash[x].remoteid.port); // XXX closed when the other process dies // can't close it yet since we might have // some more messages // close(connectionhash[x].socket); connectionhash[x].used=0; } } break; case LOWLEVELOP_SHM_REGISTER: { gs_int32_t shmid; struct ringbuf * r; struct shmid_ds sms; #ifdef PRINTMSG gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n", i->im.sender.port, i->shmtoken,i->shmsz); #endif if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) { if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) { gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno); send_nack(i->im.sender); } else { // Make sure all the momory gets mapped now for(x=0;xreader,r->writer,r->length,i->shmtoken); #endif if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken, shmid,r,i->shmsz)<0) { shmdt((gs_sp_t )r); shmctl(shmid,IPC_RMID,&sms); gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n"); send_nack(i->im.sender); } else { send_ack(i->im.sender); } } } else { gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno); send_nack(i->im.sender); } } break; case LOWLEVELOP_SHM_REMOTE_REGISTER: { gs_int32_t shmid; struct ringbuf * r; struct shmid_ds sms; #ifdef PRINTMSG gslog(LOG_EMERG,"request to get remote shm %u size %u\n", i->im.sender.port,i->shmsz); #endif if ((r=(struct ringbuf *)malloc(i->shmsz))==0) { gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno); send_nack(i->im.sender); } else { // make sure all the memory gets mapped now for(x=0;xreader=0; r->writer=0; r->length=i->shmsz; r->end= i->shmsz-MAXTUPLESZ; r->destid=i->im.receiver; r->srcid=i->im.sender; #ifdef PRINTMSG gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n" ,(void *)r,r->reader,r->writer,r->length,i->shmtoken); #endif if (shmlist_add(i->im.sender,SHM_SEND,0, 0,r,i->shmsz)<0) { gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n"); send_nack(i->im.sender); } else { send_ack(i->im.sender); } } } break; case LOWLEVELOP_SHM_FREE: { struct shmlistentry * sm; struct shmid_ds sms; #ifdef PRINTMSG gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender); #endif if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) { #ifdef PRINTMSG gslog(LOG_EMERG,"freeing %u",sm->shmid); #endif shmdt((gs_sp_t )sm->buf); shmctl(sm->shmid,IPC_RMID,&sms); shmlist_rm(i->im.sender,SHM_SEND); } } break; case LOWLEVELOP_SHM_REMOTE_FREE: { struct shmlistentry * sm; struct shmid_ds sms; #ifdef PRINTMSG gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender); #endif if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) { #ifdef PRINTMSG gslog(LOG_EMERG,"freeing %u",sm->shmid); #endif free((gs_sp_t ) sm->buf); shmlist_rm(i->im.sender,SHM_SEND); } } break; case LOWLEVELOP_REMOTE_TUPLE: { if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) { #ifdef PRINTMSG gslog(LOG_EMERG,"Received remote ringbuf message " "for message of size %u\n", it->size,it->im.size); #endif #ifdef BLOCKRINGBUFFER while (SPACETOWRITE(s->buf)==0) { usleep(1000); gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for."); } memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); ADVANCEWRITE(s->buf); #else if (SPACETOWRITE(s->buf)) { memcpy(CURWRITE(s->buf),&(it->data[0]),it->size); ADVANCEWRITE(s->buf); } else { // gslog(LOG_EMERG,"r+"); } #endif } else { gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n"); } } break; default: gslog(LOG_EMERG,"hostlib::error::unexpected message received\n"); return -1; } } return 0; } /* allocate a ringbuffer which allows receiving data from * the other process. returns 0 if didn't succeed and * returns an existing buffer if it exists */ struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length) { struct shmid_ds sms; struct shmlistentry * se; gs_int8_t keybuf[1024]; key_t shmtoken=0; gs_int32_t shmid=0; struct ringbuf * r; struct internal_message i; gs_int32_t x; se = shmlist_find(f, SHM_RECV); if (se) { return se->buf; } if (length<(4*MAXTUPLESZ)) { gslog(LOG_EMERG, "ERROR:buffersize in gscpipc_createshm has to be " "at least %u Bytes long\n", 4*MAXTUPLESZ); return 0; } if (myid.ip == f.ip) { sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port); if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1) { gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n"); return 0; } close(x); if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) { gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n"); return 0; } if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL| IPC_CREAT|IPC_EXCL))==-1) { gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno); return 0; } if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) { gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n"); return 0; } /* touch all memory once to map/reserve it now */ for(x=0;xreader=0; r->writer=0; r->length=length; r->end= r->length-MAXTUPLESZ; r->destid=f; r->srcid=myid; i.im.receiver = f; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_REGISTER; i.shmtoken = shmtoken; i.shmsz=length; if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) { if (wait_for_lowlevel_ack()<0) { shmdt((gs_sp_t )r); shmctl(shmid,IPC_RMID,&sms); return 0; } } shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */ if (shmlist_add(f, SHM_RECV, shmtoken, shmid, r, length) <0) { shmdt((gs_sp_t )r); shmctl(shmid,IPC_RMID,&sms); i.im.receiver = f; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_FREE; i.shmtoken = shmtoken; msg_send(f,(gs_sp_t )&i,i.im.size,1); return 0; } } else { /* remote shared memory */ if ((r=(struct ringbuf *)malloc(length))==0) { gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n"); return 0; } /* touch all memory once to map/reserve it now */ for(x=0;xreader=0; r->writer=0; r->length=length; r->end= r->length-MAXTUPLESZ; r->destid=f; r->srcid=myid; i.im.receiver = f; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER; i.shmtoken = shmtoken; i.shmsz=length; if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) { if (wait_for_lowlevel_ack()<0) { free(r); return 0; } } if (shmlist_add(f, SHM_RECV, 0, 0, r, length) <0) { free(r); i.im.receiver = f; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE; msg_send(f,(gs_sp_t )&i,i.im.size,1); return 0; } } return r; } /* finds a ringbuffer to send which was allocated by * gscpipc_creatshm and return 0 on an error */ struct ringbuf * gscpipc_getshm(FTAID f) { struct shmlistentry * se; gs_int32_t recmsgid; se = shmlist_find(f, SHM_SEND); if (se) { return se->buf; } return 0; } /* frees shared memory to a particular proccess identified * by MSGID */ gs_retval_t gscpipc_freeshm(FTAID f) { struct internal_message i; struct shmlistentry * sm; struct shmid_ds sms; if (myid.ip == f.ip) { if ((sm=shmlist_find(f, SHM_RECV)) <0) { shmdt((gs_sp_t )sm->buf); shmctl(sm->shmid,IPC_RMID,&sms); i.im.receiver = f; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_FREE; i.shmtoken = sm->shmtoken; msg_send(f,(gs_sp_t )&i,i.im.size,1); shmlist_rm(f, SHM_RECV); } } else { if ((sm=shmlist_find(f, SHM_RECV)) <0) { free((gs_sp_t )sm->buf); i.im.receiver = f; i.im.sender = myid; i.im.operation = RESERVED_FOR_LOW_LEVEL; i.im.size = sizeof(struct internal_message); i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE; msg_send(f,(gs_sp_t )&i,i.im.size,1); shmlist_rm(f, SHM_RECV); } } return 0; } gs_retval_t gscpipc_mqhint() { gs_int32_t x; for (x=0; xmqhint) { return 1; } } } return 0; }