1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
19 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
31 #include <sys/ioctl.h>
32 #include <netinet/tcp.h>
33 #include <linux/sockios.h>
37 #define socklen_t gs_uint32_t
41 struct FTAID clearinghouseftaid;
46 gs_int32_t socket; /* socket for connection */
47 gs_int32_t used; /* 1 if the entry is in use */
48 FTAID remoteid; /* remoteid of connection */
51 struct connection connectionhash[SOCKET_HASH_SZ];
53 gs_int32_t clearinghouse=0;
68 gs_int32_t shmlistlen=0;
69 struct shmlistentry shmlist[MAX_NUMBER_OF_SHM];
72 gs_int8_t buf[MAXMSGSZ];
77 static struct sq * sqtop=0;
78 static struct sq * sqtail=0;
80 /* adds a buffer to the end of the sidequeue*/
81 gs_retval_t gscpipc_sidequeue_append(gs_sp_t buf, gs_int32_t length)
84 if ((s=malloc(sizeof(struct sq)))==0) {
85 gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
88 memcpy(&s->buf[0],buf,MAXMSGSZ);
101 /* removes a buffer from the top of the sidequeue*/
102 gs_retval_t gscpipc_sidequeue_pop(gs_sp_t buf, gs_int32_t * length, gs_int32_t buflen)
106 if (sqtop->length > buflen) {
109 memcpy(buf,&sqtop->buf[0],sqtop->length);
110 *length=sqtop->length;
113 if (sqtop==0) sqtail=0;
123 gs_int32_t operation;
131 #define LOWLEVELOP_ACK 0
132 #define LOWLEVELOP_NACK 1
133 #define LOWLEVELOP_REGISTER 2
134 #define LOWLEVELOP_UNREGISTER 3
135 #define LOWLEVELOP_SHM_REGISTER 4
136 #define LOWLEVELOP_SHM_FREE 5
137 #define LOWLEVELOP_SHM_REMOTE_REGISTER 6
138 #define LOWLEVELOP_SHM_REMOTE_FREE 7
139 #define LOWLEVELOP_REMOTE_TUPLE 8
141 struct internal_message{
142 struct ipc_message im;
149 struct internal_remote_tuple{
150 struct ipc_message im;
164 struct shmlistentry * shmlist_find(FTAID msgid, int type)
167 for (x=0; x<shmlistlen; x++) {
168 if ((shmlist[x].msgid.ip == msgid.ip)
169 && (shmlist[x].msgid.port == msgid.port)
170 && (shmlist[x].type == type)) {
171 return &(shmlist[x]);
177 gs_retval_t shmlist_add(FTAID msgid, int type, key_t shmtoken,
178 int shmid, struct ringbuf * buf, int buffsize)
180 if (shmlist_find(msgid, type) !=0) {
183 if (shmlistlen>=MAX_NUMBER_OF_SHM) {
184 gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many"
188 shmlist[shmlistlen].msgid=msgid;
189 shmlist[shmlistlen].type=type;
190 shmlist[shmlistlen].shmtoken=shmtoken;
191 shmlist[shmlistlen].shmid=shmid;
192 shmlist[shmlistlen].buf=buf;
193 shmlist[shmlistlen].buffsize=buffsize;
198 static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type)
202 for (x=0;x<shmlistlen;x++) {
203 if ((shmlist[x].msgid.ip == msgid.ip)
204 && (shmlist[x].msgid.port == msgid.port)
205 &&(shmlist[x].type==type)) {
209 if ((move==1)&&(x<shmlistlen)) {
210 shmlist[x]=shmlist[x+1];
217 /* starts the listen socket for the current proccess
218 if the listen_port is 0 then a random port is assigned
219 this function also sets myid
221 static gs_retval_t msg_init(gs_uint32_t clearinghouse) {
222 struct sockaddr_in sin;
227 /* mark all entries in the connection hash as unused */
228 for(x=0;x<SOCKET_HASH_SZ;x++) {
229 connectionhash[x].used=0;
235 bzero(&sin, sizeof(sin));
236 sin.sin_family = AF_INET;
238 sin.sin_len = sizeof(sin);
240 sin.sin_addr.s_addr = 0;
243 if ((listen_socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
244 gslog(LOG_EMERG,"GSCMSGQ::error::could not create listen socket\n");
248 if (bind(listen_socket, (struct sockaddr *) &sin, sizeof(sin)) < 0 ) {
249 gslog(LOG_EMERG,"GSCMSGQ::error::could not bind to socket for ip %x port %u with error %u \n",
250 ntohl(sin.sin_addr.s_addr), ntohs(sin.sin_port),errno);
254 if (listen(listen_socket, 64) < 0) {
255 gslog(LOG_EMERG,"GSCMSGQ::error::could not listen to socket for port %u \n",ntohs(sin.sin_port));
256 close(listen_socket);
260 if (getsockname(listen_socket, (struct sockaddr *) &sin, &sin_sz) < 0) {
261 gslog(LOG_EMERG,"GSCMSGQ::error::could not get local port number of listen socket\n");
265 myid.port=ntohs(sin.sin_port);
266 myid.ip=ntohl(sin.sin_addr.s_addr);
271 static void closeconnection(gs_int32_t x) {
272 if (connectionhash[x].used==1) {
273 close(connectionhash[x].socket);
274 connectionhash[x].used=0;
278 static gs_retval_t writeall(gs_int32_t socket, void * b, gs_int32_t sz) {
280 gs_sp_t buf = (gs_sp_t )b;
283 if ((rv=write(socket,buf,sz))<0) {
286 else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS
297 static gs_retval_t msg_send(FTAID id, gs_sp_t buf, gs_uint32_t len, gs_uint32_t block) {
298 struct sockaddr_in sin;
306 for(x=0;x<SOCKET_HASH_SZ;x++) {
307 if (connectionhash[x].used==1) {
308 if ((connectionhash[x].remoteid.ip==id.ip)
309 && (connectionhash[x].remoteid.port==id.port)) {
313 gs_int32_t datainbuffer;
314 if (ioctl(connectionhash[x].socket,SIOCOUTQ,&datainbuffer)<1) {
316 "GSCMSGQ::error::could not determin free "
317 "space in write buffer errno %u\n",errno);
320 if ((SOCK_BUF_SZ-datainbuffer) < (len+sizeof(gs_uint32_t))) {
325 // low water mark in setsockoption is supported
329 /* since we set the SNDLOWAT to MAXSZ+4 we know that if the write
330 * select call returns with a 1 for that file descriptor at least that
331 * much memory is available in the send buffer and we therefore
332 * won't block sending
335 FD_SET(connectionhash[x].socket,&fs);
336 n=connectionhash[x].socket+1;
339 if(select(n,0,&fs,0,&tv)!=1) {
345 gslog(LOG_EMERG,"\twriting %u",ntohl(sz));
347 ret = writeall(connectionhash[x].socket,&sz,sizeof(gs_uint32_t));
350 else if (ret != sizeof(gs_uint32_t)) {
351 gslog(LOG_EMERG,"GSCMSGQ::error::could not write length\n");
354 ret = writeall(connectionhash[x].socket,buf,len);
357 else if (ret != len) {
358 gslog(LOG_EMERG,"GSCMSGQ::error::could not write message\n");
362 gslog(LOG_EMERG,"...done\n");
370 /* ok we don't have a connection make one */
371 if ((u>=SOCKET_HASH_SZ) || (u<0)) {
372 gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"
373 " TCP connection limit sending %d\n",u);
376 connectionhash[u].remoteid=id;
377 connectionhash[u].remoteid.index=0;
378 connectionhash[u].remoteid.streamid=0;
379 sin.sin_family = AF_INET;
381 sin.sin_len = sizeof(sin);
383 sin.sin_addr.s_addr = htonl(id.ip);
384 sin.sin_port = htons(id.port);
385 if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
386 gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n");
389 if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) {
390 gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n");
394 if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF,
395 (gs_sp_t )&sz, sizeof(sz)) != 0) {
396 gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n");
400 if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF,
401 (gs_sp_t )&sz, sizeof(sz)) != 0) {
402 gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n");
406 // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead
408 if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT,
409 (gs_sp_t )&sz, sizeof(sz)) != 0) {
410 gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno);
415 if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE,
416 (gs_sp_t )&sz, sizeof(sz)) != 0) {
417 gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n");
420 connectionhash[u].used=1;
425 static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) {
427 gs_sp_t buf = (gs_sp_t )b;
430 if ((rv=read(socket,buf,sz))<0) {
433 gslog(LOG_EMERG,"read with error number %u \n",errno);
445 /* msg_recv return len if data is available -1 for a timeout and -2 for an error */
446 static gs_retval_t msg_recv(gs_sp_t buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) {
447 struct sockaddr_in sin;
455 static gs_int32_t last=0;
458 if (check_sideque==1) {
459 if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) {
463 gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n");
477 FD_SET(listen_socket,&fs);
479 for(x=0;x<SOCKET_HASH_SZ;x++) {
480 if (connectionhash[x].used==1) {
481 FD_SET(connectionhash[x].socket,&fs);
482 if (n<connectionhash[x].socket) {
483 n=connectionhash[x].socket;
489 sret=select(n,&fs,0,0,&tv);
490 if ((sret<0) && (errno!=EINTR)) {
491 gslog(LOG_EMERG,"Select with error %u\n",errno);
497 if (FD_ISSET(listen_socket,&fs)) {
498 for (x=0;(x<SOCKET_HASH_SZ) && (connectionhash[x].used !=0) ;x++);
499 if (x>=SOCKET_HASH_SZ) {
500 gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"
501 "TCP connection limit accepting\n");
505 if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl))
507 gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n");
512 if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF,
513 (gs_sp_t )&sz, sizeof(sz)) != 0) {
514 fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n");
518 if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF,
519 (gs_sp_t )&sz, sizeof(sz)) != 0) {
520 fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n");
525 if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) {
526 gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n");
527 close(connectionhash[x].socket);
530 connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr);
531 connectionhash[x].remoteid.port=ntohs(sin.sin_port);
532 connectionhash[x].remoteid.index=0;
533 connectionhash[x].remoteid.streamid=0;
534 connectionhash[x].used=1;
536 gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port);
540 for(x=0;x<SOCKET_HASH_SZ;x++) {
541 last=(last+1)%SOCKET_HASH_SZ;
542 if ((connectionhash[last].used==1) && (FD_ISSET(connectionhash[last].socket,&fs))){
545 gslog(LOG_EMERG,"reading sret:%d block:%u...",sret,block);
547 if ((rsz=readall(connectionhash[last].socket,&length,sizeof(gs_uint32_t)))!=sizeof(gs_uint32_t)) {
548 closeconnection(last);
549 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing %u res %d\n",
550 connectionhash[last].remoteid.port,rsz);
553 length=ntohl(length);
556 gslog(LOG_EMERG,"GSCMSGQ::error::message to long (%u) for receive buffer (%u)\n",length,
558 /* remove the data */
559 for(y=0;y<length;y++)
560 if (readall(connectionhash[last].socket,&d,1)!=1) {
561 closeconnection(last);
562 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing for receive buffer mismatch\n");
567 if (readall(connectionhash[last].socket,buf,length)!=length) {
568 closeconnection(last);
569 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing on data read\n");
573 gslog(LOG_EMERG,"reading done\n");
581 static gs_retval_t send_nack(FTAID recid) {
582 struct internal_message i;
583 i.im.receiver = recid;
585 i.im.operation = RESERVED_FOR_LOW_LEVEL;
586 i.im.size = sizeof(struct internal_message);
587 i.lowlevelop = LOWLEVELOP_NACK;
588 if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {
596 static gs_retval_t send_ack(FTAID recid) {
597 struct internal_message i;
598 i.im.receiver = recid;
600 i.im.operation = RESERVED_FOR_LOW_LEVEL;
601 i.im.size = sizeof(struct internal_message);
602 i.lowlevelop = LOWLEVELOP_ACK;
603 if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {
611 static gs_retval_t wait_for_lowlevel_ack() {
612 gs_int8_t b[MAXMSGSZ];
613 struct internal_message * i;
615 i=(struct internal_message *)b;
617 /* this is bussy waiting if there is another
618 message waiting to be processed so make sure it is only
619 used where bussy waiting is OK. If that becomes
620 a probelm add a local queue
624 if ((res=msg_recv(b, MAXMSGSZ,1,0))>0) {
625 if ((i->im.operation == RESERVED_FOR_LOW_LEVEL)
626 && (( i->lowlevelop == LOWLEVELOP_ACK)
627 || ( i->lowlevelop == LOWLEVELOP_NACK))) {
628 if ( i->lowlevelop == LOWLEVELOP_ACK) {
634 if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) {
635 struct internal_remote_tuple * it;
636 struct shmlistentry * s;
637 it=(struct internal_remote_tuple *)b;
639 if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {
641 gslog(LOG_EMERG,"Received remote ringbuf message "
642 "for message of size %u\n",
643 it->size,it->im.size);
645 #ifdef BLOCKRINGBUFFER
646 while (SPACETOWRITE(s->buf)==0) {
648 gslog(LOG_ERR,"Dead in the water we can't "
649 "drain the ringbuffer we wait for.");
651 memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
652 ADVANCEWRITE(s->buf);
654 if (SPACETOWRITE(s->buf)) {
655 memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
656 ADVANCEWRITE(s->buf);
658 // gslog(LOG_EMERG,"r+");
662 gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");
665 gscpipc_sidequeue_append(b, res);
669 /* got an error here */
670 gslog(LOG_EMERG,"hostlib::error::received error "
671 "during wait for low level ack\n");
680 void shmlist_drain_remote()
683 gs_int8_t buf[MAXSZ];
684 struct internal_remote_tuple * it;
685 it = (struct internal_remote_tuple *) buf;
686 for (x=0; x<shmlistlen; x++) {
687 if ((shmlist[x].msgid.ip != myid.ip)&& (shmlist[x].type==SHM_SEND)) {
688 while (UNREAD(shmlist[x].buf)) {
689 it->im.receiver = shmlist[x].msgid;
690 it->im.sender = myid;
691 it->im.operation = RESERVED_FOR_LOW_LEVEL;
692 it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE;
693 it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1;
694 it->im.size = sizeof(struct internal_remote_tuple)-4+it->size;
695 memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size);
697 gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n",
698 it->size, it->im.size);
700 if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) {
703 ADVANCEREAD(shmlist[x].buf);
710 *used to contact the clearinghouse process returns the MSGID of
711 * the current process
713 gs_retval_t gscpipc_init(gs_int32_t clearinghouse)
715 struct internal_message i;
719 endpoint tmpclearinghouse;
721 /* make sure priveleges can be set */
724 clearinghouseftaid.index=0;
725 clearinghouseftaid.streamid=0;
727 if (get_hub(&gshub)!=0) {
728 gslog(LOG_EMERG,"hostlib::error::could get hub\n");
732 if (clearinghouse!=0) {
733 // This is the clearinghouse
734 gs_int8_t buf[MAXMSGSZ];
737 gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");
741 clearinghouseftaid.ip=myid.ip;
742 clearinghouseftaid.port=myid.port;
744 tmpclearinghouse.ip=htonl(clearinghouseftaid.ip);
745 tmpclearinghouse.port=htons(clearinghouseftaid.port);
747 if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) {
748 gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance");
755 // This is an lfta/hfta/app
758 if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) {
759 gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n");
763 clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip);
764 clearinghouseftaid.port=ntohs(tmpclearinghouse.port);
768 gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");
772 i.im.receiver = clearinghouseftaid;
774 i.im.operation = RESERVED_FOR_LOW_LEVEL;
775 i.im.size = sizeof(struct internal_message);
776 i.lowlevelop = LOWLEVELOP_REGISTER;
780 gslog(LOG_EMERG,"send a message (%d.%u) to %u with op "
782 i.im.pmsgid,i.im.receiver.port,i.im.operation,
785 if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) {
786 /* we can wait her for an ack since nobody should know
789 if ((res=msg_recv((gs_sp_t )&i,
790 sizeof(struct internal_message),
792 if (i.lowlevelop == LOWLEVELOP_ACK) {
795 gslog(LOG_EMERG,"hostlib::error::received unexpected message "
796 "during initalization\n");
801 /* got an error here */
802 gslog(LOG_EMERG,"hostlib::error::received error message "
803 "during initalization\n");
806 goto init_read_again;
810 gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n");
817 static gs_retval_t gscpdetachshm(FTAID target)
819 struct internal_message i;
821 i.im.receiver = target;
823 i.im.operation = RESERVED_FOR_LOW_LEVEL;
824 i.im.size = sizeof(struct internal_message);
825 i.lowlevelop = LOWLEVELOP_SHM_FREE;
826 if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) {
827 /* no reason to wait here won't be acked anyway */
833 static gs_retval_t gscpdetachsocket(FTAID target)
835 struct internal_message i;
837 i.im.receiver = target;
839 i.im.operation = RESERVED_FOR_LOW_LEVEL;
840 i.im.size = sizeof(struct internal_message);
841 i.lowlevelop = LOWLEVELOP_UNREGISTER;
842 if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) {
843 /* no reason to wait here won't be acked anyway */
850 /* used to disassociate process from clearinghouse */
851 gs_retval_t gscpipc_free()
854 /* XXX OS if this function is called when there are still
855 subscribed FTAs for this process the clearinghouse will
859 if (clearinghouse!=0) {
863 for (x=0; x<shmlistlen; x++) {
864 if (shmlist[x].type==SHM_RECV) {
865 gscpdetachshm(shmlist[x].msgid);
866 if (shmdt((gs_sp_t )shmlist[x].buf)!=0) {
867 gslog(LOG_EMERG,"hostlib::error::could not "
868 "detach shared memory\n");
871 gslog(LOG_EMERG,"hostlib::error::porccess freed while still "
872 "attached to sending shared memory\n");
876 /* remove connection */
877 for(x=0;x<SOCKET_HASH_SZ;x++) {
878 if (connectionhash[x].used==1) {
879 //XXX detach does not work due to interleaved messages
880 // gscpdetachsocket(connectionhash[x].remoteid);
881 close(connectionhash[x].socket);
884 close(listen_socket);
888 /* returns MSGID of current process */
889 FTAID gscpipc_getftaid()
895 /* sends a message to a process */
896 gs_retval_t gscpipc_send(FTAID f, gs_int32_t operation, gs_sp_t buf, gs_int32_t length, gs_int32_t block)
898 gs_int8_t b[MAXMSGSZ];
899 struct ipc_message * i;
900 struct shmlistentry * s;
901 if (length > MAXMSGSZ) {
902 gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n");
905 i = (struct ipc_message *) b;
908 i->operation=operation;
909 i->size=length+sizeof(struct ipc_message);
910 memcpy(&i->data[0],buf,length);
914 gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n",
915 i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length);
917 if ((s=shmlist_find(f, SHM_RECV))!=0) {
918 // set the hint in the ringbuffer that there is something on the shared memory queue
921 if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) {
922 gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno);
928 /* retrieve a message buf has to be at least of size MAXMSGSZ*/
929 gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t buf, gs_int32_t * size, gs_int32_t block)
936 struct internal_message * i;
937 struct internal_remote_tuple * it;
940 struct shmlistentry * s;
942 i=(struct internal_message *)b;
943 it=(struct internal_remote_tuple *)b;
945 for(y=0;(y < 10) || (block==1);y++) {
946 shmlist_drain_remote();
947 length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1);
953 /* we are nonblocking and have nothing to do */
955 // we are expected to block for ever if it is 0 or 2 we return
961 gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n",
962 i->im.pmsgid, i->im.sender, i->im.sender,i->im.operation,i->im.size);
965 gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno);
968 if (i->im.operation != RESERVED_FOR_LOW_LEVEL) {
969 memcpy(buf,&(i->im.data[0]),i->im.size);
970 *size=i->im.size-sizeof(struct ipc_message);
971 *operation=i->im.operation;
973 if ((s=shmlist_find(*f, SHM_SEND))!=0) {
974 // clear the hint in the ringbuffer to indicate we got the message
979 switch (i->lowlevelop) {
980 case LOWLEVELOP_REGISTER:
981 /* this should only get called if the process is the clearinghouse */
983 gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port);
985 send_ack(i->im.sender);
987 case LOWLEVELOP_UNREGISTER:
989 gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port);
990 #endif /* remove connection */
991 for(x=0;x<SOCKET_HASH_SZ;x++) {
992 if ( (connectionhash[x].used==1)
993 && (connectionhash[x].remoteid.ip==i->im.sender.ip)
994 && (connectionhash[x].remoteid.port==i->im.sender.port)) {
995 gslog(LOG_EMERG,"Close by remote request %u\n",
996 connectionhash[x].remoteid.port);
997 // XXX closed when the other process dies
998 // can't close it yet since we might have
999 // some more messages
1000 // close(connectionhash[x].socket);
1001 connectionhash[x].used=0;
1005 case LOWLEVELOP_SHM_REGISTER:
1009 struct shmid_ds sms;
1011 gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n",
1013 i->shmtoken,i->shmsz);
1015 if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) {
1016 if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) {
1017 gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno);
1018 send_nack(i->im.sender);
1020 // Make sure all the momory gets mapped now
1021 for(x=0;x<length;x=x+1024) {
1022 ((gs_uint8_t *) r)[x]=0;
1026 gslog(LOG_EMERG,"Got a ring buffer at address %p (%u %u %u %u)\n"
1027 ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);
1029 if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken,
1030 shmid,r,i->shmsz)<0) {
1032 shmctl(shmid,IPC_RMID,&sms);
1033 gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n");
1034 send_nack(i->im.sender);
1036 send_ack(i->im.sender);
1040 gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno);
1041 send_nack(i->im.sender);
1045 case LOWLEVELOP_SHM_REMOTE_REGISTER:
1049 struct shmid_ds sms;
1051 gslog(LOG_EMERG,"request to get remote shm %u size %u\n",
1052 i->im.sender.port,i->shmsz);
1054 if ((r=(struct ringbuf *)malloc(i->shmsz))==0) {
1055 gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno);
1056 send_nack(i->im.sender);
1058 // make sure all the memory gets mapped now
1059 for(x=0;x<length;x=x+1024) {
1060 ((gs_uint8_t *) r)[x]=0;
1065 r->end= i->shmsz-MAXTUPLESZ;
1066 r->destid=i->im.receiver;
1067 r->srcid=i->im.sender;
1069 gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n"
1070 ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);
1072 if (shmlist_add(i->im.sender,SHM_SEND,0,
1074 gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n");
1075 send_nack(i->im.sender);
1077 send_ack(i->im.sender);
1082 case LOWLEVELOP_SHM_FREE:
1084 struct shmlistentry * sm;
1085 struct shmid_ds sms;
1087 gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);
1089 if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {
1091 gslog(LOG_EMERG,"freeing %u",sm->shmid);
1093 shmdt((gs_sp_t )sm->buf);
1094 shmctl(sm->shmid,IPC_RMID,&sms);
1095 shmlist_rm(i->im.sender,SHM_SEND);
1099 case LOWLEVELOP_SHM_REMOTE_FREE:
1101 struct shmlistentry * sm;
1102 struct shmid_ds sms;
1104 gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);
1106 if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {
1108 gslog(LOG_EMERG,"freeing %u",sm->shmid);
1110 free((gs_sp_t ) sm->buf);
1111 shmlist_rm(i->im.sender,SHM_SEND);
1115 case LOWLEVELOP_REMOTE_TUPLE:
1117 if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {
1119 gslog(LOG_EMERG,"Received remote ringbuf message "
1120 "for message of size %u\n",
1121 it->size,it->im.size);
1123 #ifdef BLOCKRINGBUFFER
1124 while (SPACETOWRITE(s->buf)==0) {
1126 gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for.");
1128 memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
1129 ADVANCEWRITE(s->buf);
1131 if (SPACETOWRITE(s->buf)) {
1132 memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
1133 ADVANCEWRITE(s->buf);
1135 // gslog(LOG_EMERG,"r+");
1139 gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");
1144 gslog(LOG_EMERG,"hostlib::error::unexpected message received\n");
1151 /* allocate a ringbuffer which allows receiving data from
1152 * the other process. returns 0 if didn't succeed and
1153 * returns an existing buffer if it exists */
1154 struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length)
1156 struct shmid_ds sms;
1157 struct shmlistentry * se;
1158 gs_int8_t keybuf[1024];
1162 struct internal_message i;
1165 se = shmlist_find(f, SHM_RECV);
1170 if (length<(4*MAXTUPLESZ)) {
1172 "ERROR:buffersize in gscpipc_createshm has to be "
1173 "at least %u Bytes long\n",
1178 if (myid.ip == f.ip) {
1179 sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port);
1180 if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1) {
1181 gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n");
1186 if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) {
1187 gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n");
1191 if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL|
1192 IPC_CREAT|IPC_EXCL))==-1) {
1193 gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno);
1196 if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) {
1197 gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n");
1200 /* touch all memory once to map/reserve it now */
1201 for(x=0;x<length;x=x+1024) {
1202 ((gs_uint8_t *) r)[x]=0;
1207 r->end= r->length-MAXTUPLESZ;
1212 i.im.operation = RESERVED_FOR_LOW_LEVEL;
1213 i.im.size = sizeof(struct internal_message);
1214 i.lowlevelop = LOWLEVELOP_SHM_REGISTER;
1215 i.shmtoken = shmtoken;
1217 if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {
1218 if (wait_for_lowlevel_ack()<0) {
1220 shmctl(shmid,IPC_RMID,&sms);
1224 shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */
1225 if (shmlist_add(f, SHM_RECV, shmtoken,
1226 shmid, r, length) <0) {
1228 shmctl(shmid,IPC_RMID,&sms);
1231 i.im.operation = RESERVED_FOR_LOW_LEVEL;
1232 i.im.size = sizeof(struct internal_message);
1233 i.lowlevelop = LOWLEVELOP_SHM_FREE;
1234 i.shmtoken = shmtoken;
1235 msg_send(f,(gs_sp_t )&i,i.im.size,1);
1239 /* remote shared memory */
1240 if ((r=(struct ringbuf *)malloc(length))==0) {
1241 gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n");
1244 /* touch all memory once to map/reserve it now */
1245 for(x=0;x<length;x=x+1024) {
1246 ((gs_uint8_t *) r)[x]=0;
1251 r->end= r->length-MAXTUPLESZ;
1256 i.im.operation = RESERVED_FOR_LOW_LEVEL;
1257 i.im.size = sizeof(struct internal_message);
1258 i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER;
1259 i.shmtoken = shmtoken;
1261 if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {
1262 if (wait_for_lowlevel_ack()<0) {
1267 if (shmlist_add(f, SHM_RECV, 0,
1272 i.im.operation = RESERVED_FOR_LOW_LEVEL;
1273 i.im.size = sizeof(struct internal_message);
1274 i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;
1275 msg_send(f,(gs_sp_t )&i,i.im.size,1);
1283 /* finds a ringbuffer to send which was allocated by
1284 * gscpipc_creatshm and return 0 on an error */
1286 struct ringbuf * gscpipc_getshm(FTAID f)
1288 struct shmlistentry * se;
1289 gs_int32_t recmsgid;
1290 se = shmlist_find(f, SHM_SEND);
1297 /* frees shared memory to a particular proccess identified
1300 gs_retval_t gscpipc_freeshm(FTAID f)
1302 struct internal_message i;
1303 struct shmlistentry * sm;
1304 struct shmid_ds sms;
1305 if (myid.ip == f.ip) {
1306 if ((sm=shmlist_find(f, SHM_RECV)) <0) {
1307 shmdt((gs_sp_t )sm->buf);
1308 shmctl(sm->shmid,IPC_RMID,&sms);
1311 i.im.operation = RESERVED_FOR_LOW_LEVEL;
1312 i.im.size = sizeof(struct internal_message);
1313 i.lowlevelop = LOWLEVELOP_SHM_FREE;
1314 i.shmtoken = sm->shmtoken;
1315 msg_send(f,(gs_sp_t )&i,i.im.size,1);
1316 shmlist_rm(f, SHM_RECV);
1319 if ((sm=shmlist_find(f, SHM_RECV)) <0) {
1320 free((gs_sp_t )sm->buf);
1323 i.im.operation = RESERVED_FOR_LOW_LEVEL;
1324 i.im.size = sizeof(struct internal_message);
1325 i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;
1326 msg_send(f,(gs_sp_t )&i,i.im.size,1);
1327 shmlist_rm(f, SHM_RECV);
1333 gs_retval_t gscpipc_mqhint()
1336 for (x=0; x<shmlistlen; x++) {
1337 if (shmlist[x].type == SHM_SEND) {
1338 if (shmlist[x].buf->mqhint) {