Fixed newline characters throughout the code
[com/gs-lite.git] / src / lib / gscphost / gscpipc.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include "gsconfig.h"
16 #include "gstypes.h"
17 #include "gscpipc.h"
18 #include "gshub.h"
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <sys/ipc.h>
22 #include <sys/shm.h>
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <fcntl.h>
26 #include <errno.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29
30 #ifdef __linux__
31 #include <sys/ioctl.h>
32 #include <netinet/tcp.h>
33 #include <linux/sockios.h>
34 #endif
35
36 #ifndef socklen_t
37 #define socklen_t gs_uint32_t
38 #endif
39
40
41 struct FTAID clearinghouseftaid;
42
43
44
45 struct connection {
46     gs_int32_t socket; /* socket for connection */
47     gs_int32_t used;   /* 1 if the entry is in use */
48     FTAID remoteid; /* remoteid of connection */
49 };
50
51 struct connection connectionhash[SOCKET_HASH_SZ];
52
53 gs_int32_t clearinghouse=0;
54
55 #define SHMTYPE 's'
56 #define SHM_RECV 1
57 #define SHM_SEND 2
58
59 struct shmlistentry {
60     FTAID msgid;
61     gs_int32_t type;
62     key_t shmtoken;
63     gs_int32_t shmid;
64     struct ringbuf * buf;
65     gs_int32_t buffsize;
66 };
67
68 gs_int32_t shmlistlen=0;
69 struct shmlistentry shmlist[MAX_NUMBER_OF_SHM];
70
71 struct sq {
72     gs_int8_t buf[MAXMSGSZ];
73     gs_int32_t length;
74     struct sq * next;
75 };
76
77 static struct sq * sqtop=0;
78 static struct sq * sqtail=0;
79
80 /* adds a buffer to the end of the sidequeue*/
81 gs_retval_t gscpipc_sidequeue_append(gs_sp_t  buf, gs_int32_t length)
82 {
83     struct sq * s;
84     if ((s=malloc(sizeof(struct sq)))==0) {
85         gslog(LOG_EMERG,"Could not allocate memory for sidequeue");
86         return -1;
87     }
88     memcpy(&s->buf[0],buf,MAXMSGSZ);
89     s->length=length;
90     s->next=0;
91     if (sqtail) {
92         sqtail->next=s;
93         sqtail=s;
94     } else {
95         sqtop = s;
96         sqtail = s;
97     }
98     return 0;
99 }
100
101 /* removes a buffer from the top of the sidequeue*/
102 gs_retval_t gscpipc_sidequeue_pop(gs_sp_t  buf, gs_int32_t * length, gs_int32_t buflen)
103 {
104     struct sq * s;
105     if (sqtop) {
106         if (sqtop->length > buflen) {
107             return -2;
108         }
109         memcpy(buf,&sqtop->buf[0],sqtop->length);
110         *length=sqtop->length;
111         s=sqtop;
112         sqtop=sqtop->next;
113         if (sqtop==0) sqtail=0;
114         free(s);
115         return 0;
116     }
117     return -1;
118 }
119
120 struct ipc_message {
121     FTAID receiver;
122     FTAID sender;
123     gs_int32_t operation;
124 #ifdef PRINTMSG
125     gs_int32_t pmsgid;
126 #endif
127     gs_int32_t  size;
128     gs_int8_t data[4];
129 };
130
131 #define LOWLEVELOP_ACK 0
132 #define LOWLEVELOP_NACK 1
133 #define LOWLEVELOP_REGISTER 2
134 #define LOWLEVELOP_UNREGISTER 3
135 #define LOWLEVELOP_SHM_REGISTER 4
136 #define LOWLEVELOP_SHM_FREE 5
137 #define LOWLEVELOP_SHM_REMOTE_REGISTER 6
138 #define LOWLEVELOP_SHM_REMOTE_FREE 7
139 #define LOWLEVELOP_REMOTE_TUPLE 8
140
141 struct internal_message{
142     struct ipc_message im;
143     int lowlevelop;
144     key_t shmtoken;
145     int shmsz;
146     int result;
147 };
148
149 struct internal_remote_tuple{
150     struct ipc_message im;
151     int lowlevelop;
152     int size;
153     gs_int8_t data[4];
154 };
155
156 FTAID myid;
157 int listen_socket;
158
159 #ifdef PRINTMSG
160 int pmsgid=0;
161 #endif
162
163
164 struct shmlistentry * shmlist_find(FTAID msgid, int type)
165 {
166     int x;
167     for (x=0; x<shmlistlen; x++) {
168         if ((shmlist[x].msgid.ip == msgid.ip)
169             && (shmlist[x].msgid.port == msgid.port)
170             && (shmlist[x].type == type)) {
171             return &(shmlist[x]);
172         }
173     }
174     return 0;
175 }
176
177 gs_retval_t shmlist_add(FTAID msgid, int type, key_t shmtoken,
178                         int shmid, struct ringbuf * buf, int buffsize)
179 {
180     if (shmlist_find(msgid, type) !=0) {
181         return -1;
182     }
183     if (shmlistlen>=MAX_NUMBER_OF_SHM) {
184         gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many"
185               "shm registered\n");
186         return -1;
187     }
188     shmlist[shmlistlen].msgid=msgid;
189     shmlist[shmlistlen].type=type;
190     shmlist[shmlistlen].shmtoken=shmtoken;
191     shmlist[shmlistlen].shmid=shmid;
192     shmlist[shmlistlen].buf=buf;
193     shmlist[shmlistlen].buffsize=buffsize;
194     shmlistlen++;
195     return 0;
196 }
197
198 static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type)
199 {
200     gs_int32_t x;
201     gs_int32_t move=0;
202     for (x=0;x<shmlistlen;x++) {
203         if ((shmlist[x].msgid.ip == msgid.ip)
204             && (shmlist[x].msgid.port == msgid.port)
205             &&(shmlist[x].type==type)) {
206             shmlistlen--;
207             move=1;
208         }
209         if ((move==1)&&(x<shmlistlen)) {
210             shmlist[x]=shmlist[x+1];
211         }
212     }
213     return 0;
214 }
215
216
217 /* starts the listen socket for the current proccess
218  if the listen_port is 0 then a random port is assigned
219  this function also sets myid
220  */
221 static gs_retval_t msg_init(gs_uint32_t clearinghouse) {
222     struct sockaddr_in sin;
223     gs_int32_t x;
224     FILE *f;
225     socklen_t sin_sz;
226     
227     /* mark all entries in the connection hash as unused */
228     for(x=0;x<SOCKET_HASH_SZ;x++) {
229         connectionhash[x].used=0;
230     }
231     
232     myid.index=0;
233     myid.streamid=0;
234     
235     bzero(&sin, sizeof(sin));
236     sin.sin_family = AF_INET;
237 #ifdef __NetBSD__
238     sin.sin_len = sizeof(sin);
239 #endif
240     sin.sin_addr.s_addr = 0;
241     sin.sin_port = 0;
242     
243     if ((listen_socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
244         gslog(LOG_EMERG,"GSCMSGQ::error::could not create listen socket\n");
245         return -1;
246     }
247     
248     if (bind(listen_socket, (struct sockaddr *) &sin, sizeof(sin)) < 0 ) {
249         gslog(LOG_EMERG,"GSCMSGQ::error::could not bind to socket for ip %x port %u with error %u \n",
250               ntohl(sin.sin_addr.s_addr), ntohs(sin.sin_port),errno);
251         return -1;
252     }
253     
254     if (listen(listen_socket, 64) < 0) {
255         gslog(LOG_EMERG,"GSCMSGQ::error::could not listen to socket for port %u \n",ntohs(sin.sin_port));
256         close(listen_socket);
257         return -1;
258     }
259     sin_sz=sizeof(sin);
260     if (getsockname(listen_socket, (struct sockaddr *) &sin, &sin_sz) < 0) {
261         gslog(LOG_EMERG,"GSCMSGQ::error::could not get local port number of listen socket\n");
262         return -1;
263     }
264     
265     myid.port=ntohs(sin.sin_port);
266     myid.ip=ntohl(sin.sin_addr.s_addr);
267     
268     return 0;
269 }
270
271 static void closeconnection(gs_int32_t x) {
272     if (connectionhash[x].used==1)  {
273         close(connectionhash[x].socket);
274         connectionhash[x].used=0;
275     }
276 }
277
278 static gs_retval_t writeall(gs_int32_t socket, void * b, gs_int32_t sz) {
279     gs_int32_t rv;
280     gs_sp_t  buf = (gs_sp_t )b;
281     gs_int32_t res=sz;
282     while(sz>0) {
283         if ((rv=write(socket,buf,sz))<0) {
284             if (errno == EINTR)
285                 continue;
286             else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS
287                 return 0;
288             else
289                 return -1;
290         }
291         sz-=rv;
292         buf+=rv;
293     }
294     return res;
295 }
296
297 static gs_retval_t msg_send(FTAID id, gs_sp_t  buf, gs_uint32_t len, gs_uint32_t block) {
298     struct sockaddr_in sin;
299     gs_int32_t x;
300     gs_int32_t u;
301     gs_int32_t sz;
302     gs_int32_t ret;
303     
304 try_send_again:
305     u=-1;
306     for(x=0;x<SOCKET_HASH_SZ;x++) {
307         if (connectionhash[x].used==1) {
308             if ((connectionhash[x].remoteid.ip==id.ip)
309                 && (connectionhash[x].remoteid.port==id.port)) {
310                 sz=htonl(len);
311                 if (block==0) {
312 #ifdef __linux__
313                     gs_int32_t datainbuffer;
314                     if (ioctl(connectionhash[x].socket,SIOCOUTQ,&datainbuffer)<1) {
315                         gslog(LOG_EMERG,
316                               "GSCMSGQ::error::could not determin free "
317                               "space in write buffer errno %u\n",errno);
318                         return -1;
319                     }
320                     if ((SOCK_BUF_SZ-datainbuffer) < (len+sizeof(gs_uint32_t))) {
321                         return 1;
322                     }
323                     
324 #else
325                     // low water mark in setsockoption is supported
326                     fd_set fs;
327                     gs_int32_t n;
328                     struct timeval tv;
329                     /* since we set the SNDLOWAT to MAXSZ+4 we know that if the write
330                      * select call returns with a 1 for that file descriptor at least that
331                      * much memory is available in the send buffer and we therefore
332                      * won't block sending
333                      */
334                     FD_ZERO(&fs);
335                     FD_SET(connectionhash[x].socket,&fs);
336                     n=connectionhash[x].socket+1;
337                     tv.tv_sec = 0;
338                     tv.tv_usec = 0;
339                     if(select(n,0,&fs,0,&tv)!=1) {
340                         return 1;
341                     }
342 #endif
343                 }
344 #ifdef PRINTMSG
345                 gslog(LOG_EMERG,"\twriting %u",ntohl(sz));
346 #endif
347                 ret = writeall(connectionhash[x].socket,&sz,sizeof(gs_uint32_t));
348                 if (!ret)
349                     return 1;
350                 else if (ret != sizeof(gs_uint32_t)) {
351                     gslog(LOG_EMERG,"GSCMSGQ::error::could not write length\n");
352                     return -1;
353                 }
354                 ret = writeall(connectionhash[x].socket,buf,len);
355                 if (!ret)
356                     return 1;
357                 else if (ret != len) {
358                     gslog(LOG_EMERG,"GSCMSGQ::error::could not write message\n");
359                     return -1;
360                 }
361 #ifdef PRINTMSG
362                 gslog(LOG_EMERG,"...done\n");
363 #endif
364                 return 0;
365             }
366         } else {
367             if (u==-1) { u=x; }
368         }
369     }
370     /* ok we don't have a connection make one */
371     if ((u>=SOCKET_HASH_SZ) || (u<0)) {
372         gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"
373               " TCP connection limit sending %d\n",u);
374         return -1;
375     }
376     connectionhash[u].remoteid=id;
377     connectionhash[u].remoteid.index=0;
378     connectionhash[u].remoteid.streamid=0;
379     sin.sin_family = AF_INET;
380 #ifdef __NetBSD__
381     sin.sin_len = sizeof(sin);
382 #endif
383     sin.sin_addr.s_addr = htonl(id.ip);
384     sin.sin_port = htons(id.port);
385     if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
386         gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n");
387         return -1;
388     }
389     if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) {
390         gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n");
391         return -1;
392     }
393     sz=SOCK_BUF_SZ;
394     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF,
395                    (gs_sp_t )&sz, sizeof(sz)) != 0) {
396         gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n");
397         return -1;
398     }
399     sz=SOCK_BUF_SZ;
400     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF,
401                    (gs_sp_t )&sz, sizeof(sz)) != 0) {
402         gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n");
403         return -1;
404     }
405 #ifndef __linux__
406     // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead
407     sz=MAXSZ+4;
408     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT,
409                    (gs_sp_t )&sz, sizeof(sz)) != 0) {
410         gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno);
411         return -1;
412     }
413 #endif
414     sz=1;
415     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE,
416                    (gs_sp_t )&sz, sizeof(sz)) != 0) {
417         gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n");
418         return -1;
419     }
420     connectionhash[u].used=1;
421     goto try_send_again;
422     return -1;
423 }
424
425 static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) {
426     gs_int32_t rv;
427     gs_sp_t  buf = (gs_sp_t )b;
428     gs_int32_t res=sz;
429     while(sz>0) {
430         if ((rv=read(socket,buf,sz))<0) {
431             if (errno == EINTR)
432                 continue;
433             gslog(LOG_EMERG,"read with error number %u \n",errno);
434             return -1;
435         }
436         if (rv==0) {
437             return 0;
438         }
439         sz-=rv;
440         buf+=rv;
441     }
442     return res;
443 }
444
445 /* msg_recv return len if data is available -1 for a timeout and -2 for an error */
446 static gs_retval_t msg_recv(gs_sp_t  buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) {
447     struct sockaddr_in sin;
448     socklen_t sl;
449     fd_set fs;
450     gs_int32_t x,y;
451     gs_int32_t n;
452     gs_int32_t length;
453     struct timeval tv;
454     gs_int32_t sret;
455     static gs_int32_t last=0;
456     gs_uint32_t sz;
457     
458     if (check_sideque==1) {
459         if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) {
460             return length;
461         }
462         if (x==-2) {
463             gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n");
464             return -2;
465         }
466     }
467     
468 read_again:
469     if (block==0) {
470         tv.tv_sec = 0;
471         tv.tv_usec = 0;
472     } else {
473         tv.tv_sec = 0;
474         tv.tv_usec = 100000;
475     }
476     FD_ZERO(&fs);
477     FD_SET(listen_socket,&fs);
478     n=listen_socket;
479     for(x=0;x<SOCKET_HASH_SZ;x++) {
480         if (connectionhash[x].used==1) {
481             FD_SET(connectionhash[x].socket,&fs);
482             if (n<connectionhash[x].socket) {
483                 n=connectionhash[x].socket;
484             }
485         }
486     }
487     n=n+1;
488     // now block
489     sret=select(n,&fs,0,0,&tv);
490     if ((sret<0) && (errno!=EINTR)) {
491         gslog(LOG_EMERG,"Select with error %u\n",errno);
492         return -2;
493     }
494     if (sret<=0) {
495         return -1;
496     }
497     if (FD_ISSET(listen_socket,&fs)) {
498         for (x=0;(x<SOCKET_HASH_SZ) && (connectionhash[x].used !=0) ;x++);
499         if (x>=SOCKET_HASH_SZ) {
500             gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"
501                   "TCP connection limit accepting\n");
502             goto read_again;
503         }
504         sl=sizeof(sin);
505         if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl))
506                     < 0) {
507             gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n");
508             goto read_again;
509         }
510         
511         sz=SOCK_BUF_SZ;
512         if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF,
513                        (gs_sp_t )&sz, sizeof(sz)) != 0) {
514             fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n");
515             return -1;
516         }
517         sz=SOCK_BUF_SZ;
518         if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF,
519                        (gs_sp_t )&sz, sizeof(sz)) != 0) {
520             fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n");
521             return -1;
522         }
523         
524         sl=sizeof(sin);
525         if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) {
526             gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n");
527             close(connectionhash[x].socket);
528             goto read_again;
529         }
530         connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr);
531         connectionhash[x].remoteid.port=ntohs(sin.sin_port);
532         connectionhash[x].remoteid.index=0;
533         connectionhash[x].remoteid.streamid=0;
534         connectionhash[x].used=1;
535 #ifdef PRINTMSG
536         gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port);
537 #endif
538         goto read_again;
539     }
540     for(x=0;x<SOCKET_HASH_SZ;x++) {
541                 last=(last+1)%SOCKET_HASH_SZ;
542         if ((connectionhash[last].used==1) && (FD_ISSET(connectionhash[last].socket,&fs))){
543             gs_int32_t rsz;
544 #ifdef PRINTMSG
545             gslog(LOG_EMERG,"reading sret:%d block:%u...",sret,block);
546 #endif
547             if ((rsz=readall(connectionhash[last].socket,&length,sizeof(gs_uint32_t)))!=sizeof(gs_uint32_t)) {
548                 closeconnection(last);
549                 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing %u res %d\n",
550                       connectionhash[last].remoteid.port,rsz);
551                 continue;
552             }
553             length=ntohl(length);
554             if (buflen<length) {
555                 gs_int8_t d;
556                 gslog(LOG_EMERG,"GSCMSGQ::error::message to long (%u) for receive buffer (%u)\n",length,
557                       buflen);
558                 /* remove the data */
559                 for(y=0;y<length;y++)
560                     if (readall(connectionhash[last].socket,&d,1)!=1) {
561                         closeconnection(last);
562                         gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing for receive buffer mismatch\n");
563                         return -2;
564                     }
565                 return -2;
566             }
567             if (readall(connectionhash[last].socket,buf,length)!=length) {
568                 closeconnection(last);
569                 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing on data read\n");
570                 continue;
571             }
572 #ifdef PRINTMSG
573             gslog(LOG_EMERG,"reading done\n");
574 #endif
575             return length;
576         }
577     }
578     return -1;
579 }
580
581 static gs_retval_t send_nack(FTAID recid) {
582     struct internal_message i;
583     i.im.receiver = recid;
584     i.im.sender = myid;
585     i.im.operation = RESERVED_FOR_LOW_LEVEL;
586     i.im.size = sizeof(struct internal_message);
587     i.lowlevelop = LOWLEVELOP_NACK;
588     if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {
589         return 0;
590     } else {
591         return -1;
592     }
593     return -1;
594 }
595
596 static gs_retval_t send_ack(FTAID recid) {
597     struct internal_message i;
598     i.im.receiver = recid;
599     i.im.sender = myid;
600     i.im.operation = RESERVED_FOR_LOW_LEVEL;
601     i.im.size = sizeof(struct internal_message);
602     i.lowlevelop = LOWLEVELOP_ACK;
603     if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {
604         return 0;
605     } else {
606         return -1;
607     }
608     return -1;
609 }
610
611 static gs_retval_t wait_for_lowlevel_ack() {
612     gs_int8_t b[MAXMSGSZ];
613     struct internal_message * i;
614     gs_int32_t res;
615     i=(struct internal_message *)b;
616     
617     /* this is bussy waiting if there is another
618      message waiting to be processed so make sure it is only
619      used where bussy waiting is OK. If that becomes
620      a probelm add a local queue
621      */
622     
623     while( 1==1) {
624         if ((res=msg_recv(b, MAXMSGSZ,1,0))>0) {
625             if ((i->im.operation == RESERVED_FOR_LOW_LEVEL)
626                 && (( i->lowlevelop ==  LOWLEVELOP_ACK)
627                     || ( i->lowlevelop ==  LOWLEVELOP_NACK))) {
628                     if ( i->lowlevelop ==  LOWLEVELOP_ACK) {
629                         return 0;
630                     } else {
631                         return 1;
632                     }
633                 }
634                         if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) {
635                                 struct internal_remote_tuple * it;
636                                 struct shmlistentry * s;
637                                 it=(struct internal_remote_tuple *)b;
638                 
639                                 if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {
640 #ifdef PRINTMSG
641                                         gslog(LOG_EMERG,"Received remote ringbuf message "
642                           "for message of size %u\n",
643                           it->size,it->im.size);
644 #endif
645 #ifdef BLOCKRINGBUFFER
646                                         while (SPACETOWRITE(s->buf)==0) {
647                                                 usleep(1000);
648                                                 gslog(LOG_ERR,"Dead in the water we can't "
649                               "drain the ringbuffer we wait for.");
650                                         }
651                                         memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
652                                         ADVANCEWRITE(s->buf);
653 #else
654                                         if (SPACETOWRITE(s->buf)) {
655                                                 memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
656                                                 ADVANCEWRITE(s->buf);
657                                         } else {
658                         //                                              gslog(LOG_EMERG,"r+");
659                                         }
660 #endif
661                                 } else {
662                                         gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");
663                                 }
664                         } else {
665                                 gscpipc_sidequeue_append(b, res);
666                         }
667         }  else {
668             if (res < -1) {
669                 /* got an error here */
670                 gslog(LOG_EMERG,"hostlib::error::received error "
671                       "during wait for low level ack\n");
672                 return -1;
673             }
674         }
675     }
676     /* never reached */
677     return -1;
678 }
679
680 void shmlist_drain_remote()
681 {
682     gs_int32_t x;
683     gs_int8_t buf[MAXSZ];
684     struct internal_remote_tuple * it;
685     it = (struct internal_remote_tuple *) buf;
686     for (x=0; x<shmlistlen; x++) {
687         if ((shmlist[x].msgid.ip != myid.ip)&& (shmlist[x].type==SHM_SEND)) {
688             while (UNREAD(shmlist[x].buf)) {
689                 it->im.receiver = shmlist[x].msgid;
690                 it->im.sender = myid;
691                 it->im.operation = RESERVED_FOR_LOW_LEVEL;
692                 it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE;
693                 it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1;
694                 it->im.size = sizeof(struct internal_remote_tuple)-4+it->size;
695                 memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size);
696 #ifdef PRINTMSG
697                 gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n",
698                       it->size, it->im.size);
699 #endif
700                 if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) {
701                     break;
702                 }
703                 ADVANCEREAD(shmlist[x].buf);
704             }
705         }
706     }
707 }
708
709 /*
710  *used to contact the clearinghouse process returns the MSGID of
711  * the current process
712  */
713 gs_retval_t gscpipc_init(gs_int32_t clearinghouse)
714 {
715     struct internal_message i;
716     key_t msqtoken=0;
717     gs_int32_t x;
718     endpoint gshub;
719     endpoint tmpclearinghouse;
720     
721     /* make sure priveleges can be set */
722     umask(0);
723     
724     clearinghouseftaid.index=0;
725     clearinghouseftaid.streamid=0;
726     
727     if (get_hub(&gshub)!=0) {
728         gslog(LOG_EMERG,"hostlib::error::could get hub\n");
729         return -1;
730     }
731     
732     if (clearinghouse!=0) {
733         // This is the clearinghouse
734         gs_int8_t buf[MAXMSGSZ];
735         
736         if (msg_init(1)<0) {
737             gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");
738             return -1;
739         }
740         
741         clearinghouseftaid.ip=myid.ip;
742         clearinghouseftaid.port=myid.port;
743         
744         tmpclearinghouse.ip=htonl(clearinghouseftaid.ip);
745         tmpclearinghouse.port=htons(clearinghouseftaid.port);
746         
747         if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) {
748             gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance");
749             return -1;
750         }
751         
752         return 0;
753         
754     } else {
755         // This is an lfta/hfta/app
756         gs_int32_t res;
757         
758         if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) {
759             gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n");
760             return -1;
761         }
762         
763         clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip);
764         clearinghouseftaid.port=ntohs(tmpclearinghouse.port);
765         
766         
767         if (msg_init(0)<0) {
768             gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");
769             return -1;
770         }
771         
772         i.im.receiver = clearinghouseftaid;
773         i.im.sender = myid;
774         i.im.operation = RESERVED_FOR_LOW_LEVEL;
775         i.im.size = sizeof(struct internal_message);
776         i.lowlevelop = LOWLEVELOP_REGISTER;
777 #ifdef PRINTMSG
778         i.im.pmsgid=pmsgid;
779         pmsgid++;
780         gslog(LOG_EMERG,"send a message (%d.%u) to %u with op "
781               "%u with size %u\n",
782               i.im.pmsgid,i.im.receiver.port,i.im.operation,
783               i.im.size);
784 #endif
785         if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) {
786             /* we can wait her for an ack since nobody should know
787              about us yet */
788         init_read_again:
789             if ((res=msg_recv((gs_sp_t )&i,
790                               sizeof(struct internal_message),
791                               1,0))>=0) {
792                 if (i.lowlevelop == LOWLEVELOP_ACK) {
793                     return 0;
794                 } else {
795                     gslog(LOG_EMERG,"hostlib::error::received unexpected message "
796                           "during initalization\n");
797                     return -1;
798                 }
799             } else {
800                 if (res<-1) {
801                     /* got an error here */
802                     gslog(LOG_EMERG,"hostlib::error::received error message "
803                           "during initalization\n");
804                     return -1;
805                 } else {
806                     goto init_read_again;
807                 }
808             }
809         }
810         gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n");
811         return -1;
812     }
813     return 0;
814 }
815
816
817 static gs_retval_t gscpdetachshm(FTAID target)
818 {
819     struct internal_message i;
820     
821     i.im.receiver = target;
822     i.im.sender = myid;
823     i.im.operation = RESERVED_FOR_LOW_LEVEL;
824     i.im.size = sizeof(struct internal_message);
825     i.lowlevelop = LOWLEVELOP_SHM_FREE;
826     if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) {
827         /* no reason to wait here won't be acked anyway */
828         return -1;
829     }
830     return 0;
831 }
832
833 static gs_retval_t gscpdetachsocket(FTAID target)
834 {
835     struct internal_message i;
836     
837     i.im.receiver = target;
838     i.im.sender = myid;
839     i.im.operation = RESERVED_FOR_LOW_LEVEL;
840     i.im.size = sizeof(struct internal_message);
841     i.lowlevelop = LOWLEVELOP_UNREGISTER;
842     if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) {
843         /* no reason to wait here won't be acked anyway */
844         return -1;
845     }
846     return -1;
847 }
848
849
850 /* used to disassociate process from clearinghouse */
851 gs_retval_t gscpipc_free()
852 {
853     gs_int32_t x;
854     /* XXX OS if this function is called when there are still
855      subscribed FTAs for this process the clearinghouse will
856      crash
857      */
858     
859     if (clearinghouse!=0) {
860         return 0;
861     } else {
862         gs_int32_t x;
863         for (x=0; x<shmlistlen; x++) {
864             if (shmlist[x].type==SHM_RECV) {
865                 gscpdetachshm(shmlist[x].msgid);
866                 if (shmdt((gs_sp_t )shmlist[x].buf)!=0) {
867                     gslog(LOG_EMERG,"hostlib::error::could not "
868                           "detach shared memory\n");
869                 }
870             } else {
871                 gslog(LOG_EMERG,"hostlib::error::porccess freed while still "
872                       "attached to sending shared memory\n");
873             }
874         }
875     }
876     /* remove connection */
877     for(x=0;x<SOCKET_HASH_SZ;x++) {
878         if (connectionhash[x].used==1) {
879             //XXX detach does not work due to interleaved messages
880             // gscpdetachsocket(connectionhash[x].remoteid);
881             close(connectionhash[x].socket);
882         }
883     }
884     close(listen_socket);
885     return 0;
886 }
887
888 /* returns MSGID of current process */
889 FTAID gscpipc_getftaid()
890 {
891     return myid;
892 }
893
894
895 /* sends a message to a process */
896 gs_retval_t gscpipc_send(FTAID f, gs_int32_t operation, gs_sp_t  buf, gs_int32_t length, gs_int32_t block)
897 {
898     gs_int8_t b[MAXMSGSZ];
899     struct ipc_message * i;
900     struct shmlistentry * s;
901     if (length > MAXMSGSZ) {
902         gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n");
903         return -1;
904     }
905     i = (struct ipc_message *) b;
906     i->receiver=f;
907     i->sender=myid;
908     i->operation=operation;
909     i->size=length+sizeof(struct ipc_message);
910     memcpy(&i->data[0],buf,length);
911 #ifdef PRINTMSG
912     i->pmsgid=pmsgid;
913     pmsgid++;
914     gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n",
915           i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length);
916 #endif
917     if ((s=shmlist_find(f, SHM_RECV))!=0) {
918         // set the hint in the ringbuffer that there is something on the shared memory queue
919         s->buf->mqhint=1;
920     }
921     if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) {
922         gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno);
923         return -1;
924     }
925     return 0;
926 }
927
928 /* retrieve a message buf has to be at least of size MAXMSGSZ*/
929 gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t  buf, gs_int32_t * size, gs_int32_t block)
930 {
931     gs_int32_t w;
932     gs_int32_t x;
933     gs_int8_t b[MAXSZ];
934         gs_int32_t y;
935     
936     struct internal_message * i;
937     struct internal_remote_tuple * it;
938     gs_int32_t length;
939     
940     struct shmlistentry * s;
941     
942     i=(struct internal_message *)b;
943     it=(struct internal_remote_tuple *)b;
944     
945     for(y=0;(y < 10) || (block==1);y++) {
946         shmlist_drain_remote();
947                 length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1);
948                 if (length < -1) {
949                         /* problem */
950                         return -1;
951                 }
952                 if (length < 0) {
953                         /* we are nonblocking and have nothing to do */
954             if (block==1) {
955                 // we are expected to block for ever if it is 0 or 2 we return
956                 continue;
957             }
958                         return 0;
959                 }
960 #ifdef PRINTMSG
961                 gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n",
962               i->im.pmsgid,  i->im.sender, i->im.sender,i->im.operation,i->im.size);
963 #endif
964                 if (length <0) {
965                         gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno);
966                         return -1;
967                 }
968                 if (i->im.operation != RESERVED_FOR_LOW_LEVEL) {
969                         memcpy(buf,&(i->im.data[0]),i->im.size);
970                         *size=i->im.size-sizeof(struct ipc_message);
971                         *operation=i->im.operation;
972                         *f=i->im.sender;
973                         if ((s=shmlist_find(*f, SHM_SEND))!=0) {
974                                 // clear the hint in the ringbuffer to indicate we got the message
975                 s->buf->mqhint=0;
976                         }
977                         return 1;
978                 }
979                 switch (i->lowlevelop) {
980             case LOWLEVELOP_REGISTER:
981                 /* this should only get called if the process is the clearinghouse */
982 #ifdef PRINTMSG
983                 gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port);
984 #endif
985                 send_ack(i->im.sender);
986                 break;
987             case LOWLEVELOP_UNREGISTER:
988 #ifdef PRINTMSG
989                 gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port);
990 #endif    /* remove connection */
991                 for(x=0;x<SOCKET_HASH_SZ;x++) {
992                     if ( (connectionhash[x].used==1)
993                         && (connectionhash[x].remoteid.ip==i->im.sender.ip)
994                         && (connectionhash[x].remoteid.port==i->im.sender.port)) {
995                         gslog(LOG_EMERG,"Close by remote request %u\n",
996                               connectionhash[x].remoteid.port);
997                         // XXX closed when the other process dies
998                         // can't close it yet since we might have
999                         // some more messages
1000                         // close(connectionhash[x].socket);
1001                         connectionhash[x].used=0;
1002                     }
1003                 }
1004                 break;
1005             case LOWLEVELOP_SHM_REGISTER:
1006             {
1007                 gs_int32_t shmid;
1008                 struct ringbuf * r;
1009                 struct shmid_ds sms;
1010 #ifdef PRINTMSG
1011                 gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n",
1012                       i->im.sender.port,
1013                       i->shmtoken,i->shmsz);
1014 #endif
1015                 if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) {
1016                     if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) {
1017                         gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno);
1018                         send_nack(i->im.sender);
1019                     } else {
1020                         // Make sure all the momory gets mapped now
1021                                         for(x=0;x<length;x=x+1024) {
1022                             ((gs_uint8_t *) r)[x]=0;
1023                                         }
1024                         
1025 #ifdef PRINTMSG
1026                         gslog(LOG_EMERG,"Got a ring buffer at address %p (%u %u %u %u)\n"
1027                               ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);
1028 #endif
1029                         if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken,
1030                                         shmid,r,i->shmsz)<0) {
1031                             shmdt((gs_sp_t )r);
1032                             shmctl(shmid,IPC_RMID,&sms);
1033                             gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n");
1034                             send_nack(i->im.sender);
1035                         } else {
1036                             send_ack(i->im.sender);
1037                         }
1038                     }
1039                 } else {
1040                     gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno);
1041                     send_nack(i->im.sender);
1042                 }
1043             }
1044                 break;
1045             case LOWLEVELOP_SHM_REMOTE_REGISTER:
1046             {
1047                 gs_int32_t shmid;
1048                 struct ringbuf * r;
1049                 struct shmid_ds sms;
1050 #ifdef PRINTMSG
1051                 gslog(LOG_EMERG,"request to get remote shm %u  size %u\n",
1052                       i->im.sender.port,i->shmsz);
1053 #endif
1054                 if ((r=(struct ringbuf *)malloc(i->shmsz))==0) {
1055                     gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno);
1056                     send_nack(i->im.sender);
1057                 } else {
1058                     // make sure all the memory gets mapped now
1059                     for(x=0;x<length;x=x+1024) {
1060                         ((gs_uint8_t *) r)[x]=0;
1061                     }
1062                     r->reader=0;
1063                     r->writer=0;
1064                     r->length=i->shmsz;
1065                     r->end= i->shmsz-MAXTUPLESZ;
1066                     r->destid=i->im.receiver;
1067                     r->srcid=i->im.sender;
1068 #ifdef PRINTMSG
1069                     gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n"
1070                           ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);
1071 #endif
1072                     if (shmlist_add(i->im.sender,SHM_SEND,0,
1073                                     0,r,i->shmsz)<0) {
1074                         gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n");
1075                         send_nack(i->im.sender);
1076                     } else {
1077                         send_ack(i->im.sender);
1078                     }
1079                 }
1080             }
1081                 break;
1082             case LOWLEVELOP_SHM_FREE:
1083             {
1084                 struct shmlistentry * sm;
1085                 struct shmid_ds sms;
1086 #ifdef PRINTMSG
1087                 gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);
1088 #endif
1089                 if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {
1090 #ifdef PRINTMSG
1091                     gslog(LOG_EMERG,"freeing %u",sm->shmid);
1092 #endif
1093                     shmdt((gs_sp_t )sm->buf);
1094                     shmctl(sm->shmid,IPC_RMID,&sms);
1095                     shmlist_rm(i->im.sender,SHM_SEND);
1096                 }
1097             }
1098                 break;
1099             case LOWLEVELOP_SHM_REMOTE_FREE:
1100             {
1101                 struct shmlistentry * sm;
1102                 struct shmid_ds sms;
1103 #ifdef PRINTMSG
1104                 gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);
1105 #endif
1106                 if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {
1107 #ifdef PRINTMSG
1108                     gslog(LOG_EMERG,"freeing %u",sm->shmid);
1109 #endif
1110                     free((gs_sp_t ) sm->buf);
1111                     shmlist_rm(i->im.sender,SHM_SEND);
1112                 }
1113             }
1114                 break;
1115             case LOWLEVELOP_REMOTE_TUPLE:
1116             {
1117                 if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {
1118 #ifdef PRINTMSG
1119                     gslog(LOG_EMERG,"Received remote ringbuf message "
1120                           "for message of size %u\n",
1121                           it->size,it->im.size);
1122 #endif
1123 #ifdef BLOCKRINGBUFFER
1124                     while (SPACETOWRITE(s->buf)==0) {
1125                         usleep(1000);
1126                         gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for.");
1127                     }
1128                     memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
1129                     ADVANCEWRITE(s->buf);
1130 #else
1131                     if (SPACETOWRITE(s->buf)) {
1132                         memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);
1133                         ADVANCEWRITE(s->buf);
1134                     } else {
1135                         //                   gslog(LOG_EMERG,"r+");
1136                     }
1137 #endif
1138                 } else {
1139                     gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");
1140                 }
1141             }
1142                 break;
1143             default:
1144                 gslog(LOG_EMERG,"hostlib::error::unexpected message received\n");
1145                 return -1;
1146                 }
1147     }
1148     return 0;
1149 }
1150
1151 /* allocate a ringbuffer which allows receiving data from
1152  * the other process. returns 0 if didn't succeed and
1153  * returns an existing buffer if it exists */
1154 struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length)
1155 {
1156     struct shmid_ds sms;
1157     struct shmlistentry * se;
1158     gs_int8_t keybuf[1024];
1159     key_t shmtoken=0;
1160     gs_int32_t shmid=0;
1161     struct ringbuf * r;
1162     struct internal_message i;
1163     gs_int32_t x;
1164     
1165     se = shmlist_find(f, SHM_RECV);
1166     if (se) {
1167         return se->buf;
1168     }
1169     
1170     if (length<(4*MAXTUPLESZ)) {
1171         gslog(LOG_EMERG,
1172               "ERROR:buffersize in gscpipc_createshm  has to be "
1173               "at least %u Bytes long\n",
1174               4*MAXTUPLESZ);
1175         return 0;
1176     }
1177     
1178     if (myid.ip == f.ip) {
1179         sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port);
1180         if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1)  {
1181             gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n");
1182             return 0;
1183         }
1184         close(x);
1185         
1186         if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) {
1187             gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n");
1188             return 0;
1189         }
1190         
1191         if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL|
1192                                         IPC_CREAT|IPC_EXCL))==-1) {
1193             gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno);
1194             return 0;
1195         }
1196         if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) {
1197             gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n");
1198             return 0;
1199         }
1200         /* touch all memory once to map/reserve it now */
1201         for(x=0;x<length;x=x+1024) {
1202             ((gs_uint8_t *) r)[x]=0;
1203         }
1204         r->reader=0;
1205         r->writer=0;
1206         r->length=length;
1207         r->end= r->length-MAXTUPLESZ;
1208         r->destid=f;
1209         r->srcid=myid;
1210         i.im.receiver = f;
1211         i.im.sender = myid;
1212         i.im.operation = RESERVED_FOR_LOW_LEVEL;
1213         i.im.size = sizeof(struct internal_message);
1214         i.lowlevelop = LOWLEVELOP_SHM_REGISTER;
1215         i.shmtoken = shmtoken;
1216         i.shmsz=length;
1217         if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {
1218             if (wait_for_lowlevel_ack()<0) {
1219                 shmdt((gs_sp_t )r);
1220                 shmctl(shmid,IPC_RMID,&sms);
1221                 return 0;
1222             }
1223         }
1224         shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */  
1225         if (shmlist_add(f, SHM_RECV, shmtoken, 
1226                         shmid, r, length) <0) {
1227             shmdt((gs_sp_t )r);
1228             shmctl(shmid,IPC_RMID,&sms);
1229             i.im.receiver = f;
1230             i.im.sender = myid;
1231             i.im.operation = RESERVED_FOR_LOW_LEVEL;
1232             i.im.size = sizeof(struct internal_message);
1233             i.lowlevelop = LOWLEVELOP_SHM_FREE;
1234             i.shmtoken = shmtoken;
1235             msg_send(f,(gs_sp_t )&i,i.im.size,1);
1236             return 0;
1237         }
1238     } else {
1239         /* remote shared memory */
1240         if ((r=(struct ringbuf *)malloc(length))==0) {
1241             gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n");
1242             return 0;
1243         }
1244         /* touch all memory once to map/reserve it now */
1245         for(x=0;x<length;x=x+1024) {
1246             ((gs_uint8_t *) r)[x]=0;
1247         }
1248         r->reader=0;
1249         r->writer=0;
1250         r->length=length;
1251         r->end= r->length-MAXTUPLESZ;
1252         r->destid=f;
1253         r->srcid=myid;
1254         i.im.receiver = f;
1255         i.im.sender = myid;
1256         i.im.operation = RESERVED_FOR_LOW_LEVEL;
1257         i.im.size = sizeof(struct internal_message);
1258         i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER;
1259         i.shmtoken = shmtoken;
1260         i.shmsz=length;
1261         if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {
1262             if (wait_for_lowlevel_ack()<0) {
1263                 free(r);
1264                 return 0;
1265             }
1266         }
1267         if (shmlist_add(f, SHM_RECV, 0, 
1268                         0, r, length) <0) {
1269             free(r);
1270             i.im.receiver = f;
1271             i.im.sender = myid;
1272             i.im.operation = RESERVED_FOR_LOW_LEVEL;
1273             i.im.size = sizeof(struct internal_message);
1274             i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;
1275             msg_send(f,(gs_sp_t )&i,i.im.size,1);
1276             return 0;
1277         }
1278     }
1279     return r;
1280         
1281 }
1282
1283 /* finds a ringbuffer to send which was allocated by
1284  * gscpipc_creatshm and return 0 on an error */
1285
1286 struct ringbuf * gscpipc_getshm(FTAID f) 
1287 {
1288     struct shmlistentry * se;
1289     gs_int32_t recmsgid;
1290     se = shmlist_find(f, SHM_SEND);
1291     if (se) {
1292         return se->buf;
1293     }
1294     return 0;
1295 }     
1296
1297 /* frees shared memory to a particular proccess identified
1298  * by MSGID
1299  */ 
1300 gs_retval_t gscpipc_freeshm(FTAID f)
1301 {
1302     struct internal_message i;
1303     struct shmlistentry * sm;
1304     struct shmid_ds sms;
1305     if (myid.ip == f.ip) { 
1306         if ((sm=shmlist_find(f, SHM_RECV)) <0) {
1307             shmdt((gs_sp_t )sm->buf);
1308             shmctl(sm->shmid,IPC_RMID,&sms);
1309             i.im.receiver = f;
1310             i.im.sender = myid;
1311             i.im.operation = RESERVED_FOR_LOW_LEVEL;
1312             i.im.size = sizeof(struct internal_message);
1313             i.lowlevelop = LOWLEVELOP_SHM_FREE;
1314             i.shmtoken = sm->shmtoken;
1315             msg_send(f,(gs_sp_t )&i,i.im.size,1);
1316             shmlist_rm(f, SHM_RECV);
1317         }
1318     } else {
1319         if ((sm=shmlist_find(f, SHM_RECV)) <0) {
1320             free((gs_sp_t )sm->buf);
1321             i.im.receiver = f;
1322             i.im.sender = myid;
1323             i.im.operation = RESERVED_FOR_LOW_LEVEL;
1324             i.im.size = sizeof(struct internal_message);
1325             i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;
1326             msg_send(f,(gs_sp_t )&i,i.im.size,1);
1327             shmlist_rm(f, SHM_RECV);
1328         }
1329     }    
1330     return 0;
1331 }
1332
1333 gs_retval_t gscpipc_mqhint()
1334 {
1335     gs_int32_t x;
1336     for (x=0; x<shmlistlen; x++) {
1337         if (shmlist[x].type == SHM_SEND) {
1338             if (shmlist[x].buf->mqhint) {
1339                 return 1;
1340             }
1341         }
1342     }
1343     return 0;
1344 }