Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / gscpipc.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 #include "gsconfig.h"\r
16 #include "gstypes.h"\r
17 #include "gscpipc.h"\r
18 #include "gshub.h"\r
19 #include <sys/types.h>\r
20 #include <sys/stat.h>\r
21 #include <sys/ipc.h>\r
22 #include <sys/shm.h>\r
23 #include <stdlib.h>\r
24 #include <stdio.h>\r
25 #include <fcntl.h>\r
26 #include <errno.h>\r
27 #include <sys/socket.h>\r
28 #include <netinet/in.h>\r
29 \r
30 #ifdef __linux__\r
31 #include <sys/ioctl.h>\r
32 #include <netinet/tcp.h>\r
33 #include <linux/sockios.h>\r
34 #endif\r
35 \r
36 #ifndef socklen_t\r
37 #define socklen_t gs_uint32_t\r
38 #endif\r
39 \r
40 \r
41 struct FTAID clearinghouseftaid;\r
42 \r
43 \r
44 \r
45 struct connection {\r
46     gs_int32_t socket; /* socket for connection */\r
47     gs_int32_t used;   /* 1 if the entry is in use */\r
48     FTAID remoteid; /* remoteid of connection */\r
49 };\r
50 \r
51 struct connection connectionhash[SOCKET_HASH_SZ];\r
52 \r
53 gs_int32_t clearinghouse=0;\r
54 \r
55 #define SHMTYPE 's'\r
56 #define SHM_RECV 1\r
57 #define SHM_SEND 2\r
58 \r
59 struct shmlistentry {\r
60     FTAID msgid;\r
61     gs_int32_t type;\r
62     key_t shmtoken;\r
63     gs_int32_t shmid;\r
64     struct ringbuf * buf;\r
65     gs_int32_t buffsize;\r
66 };\r
67 \r
68 gs_int32_t shmlistlen=0;\r
69 struct shmlistentry shmlist[MAX_NUMBER_OF_SHM];\r
70 \r
71 struct sq {\r
72     gs_int8_t buf[MAXMSGSZ];\r
73     gs_int32_t length;\r
74     struct sq * next;\r
75 };\r
76 \r
77 static struct sq * sqtop=0;\r
78 static struct sq * sqtail=0;\r
79 \r
80 /* adds a buffer to the end of the sidequeue*/\r
81 gs_retval_t gscpipc_sidequeue_append(gs_sp_t  buf, gs_int32_t length)\r
82 {\r
83     struct sq * s;\r
84     if ((s=malloc(sizeof(struct sq)))==0) {\r
85         gslog(LOG_EMERG,"Could not allocate memory for sidequeue");\r
86         return -1;\r
87     }\r
88     memcpy(&s->buf[0],buf,MAXMSGSZ);\r
89     s->length=length;\r
90     s->next=0;\r
91     if (sqtail) {\r
92         sqtail->next=s;\r
93         sqtail=s;\r
94     } else {\r
95         sqtop = s;\r
96         sqtail = s;\r
97     }\r
98     return 0;\r
99 }\r
100 \r
101 /* removes a buffer from the top of the sidequeue*/\r
102 gs_retval_t gscpipc_sidequeue_pop(gs_sp_t  buf, gs_int32_t * length, gs_int32_t buflen)\r
103 {\r
104     struct sq * s;\r
105     if (sqtop) {\r
106         if (sqtop->length > buflen) {\r
107             return -2;\r
108         }\r
109         memcpy(buf,&sqtop->buf[0],sqtop->length);\r
110         *length=sqtop->length;\r
111         s=sqtop;\r
112         sqtop=sqtop->next;\r
113         if (sqtop==0) sqtail=0;\r
114         free(s);\r
115         return 0;\r
116     }\r
117     return -1;\r
118 }\r
119 \r
120 struct ipc_message {\r
121     FTAID receiver;\r
122     FTAID sender;\r
123     gs_int32_t operation;\r
124 #ifdef PRINTMSG\r
125     gs_int32_t pmsgid;\r
126 #endif\r
127     gs_int32_t  size;\r
128     gs_int8_t data[4];\r
129 };\r
130 \r
131 #define LOWLEVELOP_ACK 0\r
132 #define LOWLEVELOP_NACK 1\r
133 #define LOWLEVELOP_REGISTER 2\r
134 #define LOWLEVELOP_UNREGISTER 3\r
135 #define LOWLEVELOP_SHM_REGISTER 4\r
136 #define LOWLEVELOP_SHM_FREE 5\r
137 #define LOWLEVELOP_SHM_REMOTE_REGISTER 6\r
138 #define LOWLEVELOP_SHM_REMOTE_FREE 7\r
139 #define LOWLEVELOP_REMOTE_TUPLE 8\r
140 \r
141 struct internal_message{\r
142     struct ipc_message im;\r
143     int lowlevelop;\r
144     key_t shmtoken;\r
145     int shmsz;\r
146     int result;\r
147 };\r
148 \r
149 struct internal_remote_tuple{\r
150     struct ipc_message im;\r
151     int lowlevelop;\r
152     int size;\r
153     gs_int8_t data[4];\r
154 };\r
155 \r
156 FTAID myid;\r
157 int listen_socket;\r
158 \r
159 #ifdef PRINTMSG\r
160 int pmsgid=0;\r
161 #endif\r
162 \r
163 \r
164 struct shmlistentry * shmlist_find(FTAID msgid, int type)\r
165 {\r
166     int x;\r
167     for (x=0; x<shmlistlen; x++) {\r
168         if ((shmlist[x].msgid.ip == msgid.ip)\r
169             && (shmlist[x].msgid.port == msgid.port)\r
170             && (shmlist[x].type == type)) {\r
171             return &(shmlist[x]);\r
172         }\r
173     }\r
174     return 0;\r
175 }\r
176 \r
177 gs_retval_t shmlist_add(FTAID msgid, int type, key_t shmtoken,\r
178                         int shmid, struct ringbuf * buf, int buffsize)\r
179 {\r
180     if (shmlist_find(msgid, type) !=0) {\r
181         return -1;\r
182     }\r
183     if (shmlistlen>=MAX_NUMBER_OF_SHM) {\r
184         gslog(LOG_EMERG,"GSCPTR::error::could not register shm to many"\r
185               "shm registered\n");\r
186         return -1;\r
187     }\r
188     shmlist[shmlistlen].msgid=msgid;\r
189     shmlist[shmlistlen].type=type;\r
190     shmlist[shmlistlen].shmtoken=shmtoken;\r
191     shmlist[shmlistlen].shmid=shmid;\r
192     shmlist[shmlistlen].buf=buf;\r
193     shmlist[shmlistlen].buffsize=buffsize;\r
194     shmlistlen++;\r
195     return 0;\r
196 }\r
197 \r
198 static gs_retval_t shmlist_rm(FTAID msgid, gs_int32_t type)\r
199 {\r
200     gs_int32_t x;\r
201     gs_int32_t move=0;\r
202     for (x=0;x<shmlistlen;x++) {\r
203         if ((shmlist[x].msgid.ip == msgid.ip)\r
204             && (shmlist[x].msgid.port == msgid.port)\r
205             &&(shmlist[x].type==type)) {\r
206             shmlistlen--;\r
207             move=1;\r
208         }\r
209         if ((move==1)&&(x<shmlistlen)) {\r
210             shmlist[x]=shmlist[x+1];\r
211         }\r
212     }\r
213     return 0;\r
214 }\r
215 \r
216 \r
217 /* starts the listen socket for the current proccess\r
218  if the listen_port is 0 then a random port is assigned\r
219  this function also sets myid\r
220  */\r
221 static gs_retval_t msg_init(gs_uint32_t clearinghouse) {\r
222     struct sockaddr_in sin;\r
223     gs_int32_t x;\r
224     FILE *f;\r
225     socklen_t sin_sz;\r
226     \r
227     /* mark all entries in the connection hash as unused */\r
228     for(x=0;x<SOCKET_HASH_SZ;x++) {\r
229         connectionhash[x].used=0;\r
230     }\r
231     \r
232     myid.index=0;\r
233     myid.streamid=0;\r
234     \r
235     bzero(&sin, sizeof(sin));\r
236     sin.sin_family = AF_INET;\r
237 #ifdef __NetBSD__\r
238     sin.sin_len = sizeof(sin);\r
239 #endif\r
240     sin.sin_addr.s_addr = 0;\r
241     sin.sin_port = 0;\r
242     \r
243     if ((listen_socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {\r
244         gslog(LOG_EMERG,"GSCMSGQ::error::could not create listen socket\n");\r
245         return -1;\r
246     }\r
247     \r
248     if (bind(listen_socket, (struct sockaddr *) &sin, sizeof(sin)) < 0 ) {\r
249         gslog(LOG_EMERG,"GSCMSGQ::error::could not bind to socket for ip %x port %u with error %u \n",\r
250               ntohl(sin.sin_addr.s_addr), ntohs(sin.sin_port),errno);\r
251         return -1;\r
252     }\r
253     \r
254     if (listen(listen_socket, 64) < 0) {\r
255         gslog(LOG_EMERG,"GSCMSGQ::error::could not listen to socket for port %u \n",ntohs(sin.sin_port));\r
256         close(listen_socket);\r
257         return -1;\r
258     }\r
259     sin_sz=sizeof(sin);\r
260     if (getsockname(listen_socket, (struct sockaddr *) &sin, &sin_sz) < 0) {\r
261         gslog(LOG_EMERG,"GSCMSGQ::error::could not get local port number of listen socket\n");\r
262         return -1;\r
263     }\r
264     \r
265     myid.port=ntohs(sin.sin_port);\r
266     myid.ip=ntohl(sin.sin_addr.s_addr);\r
267     \r
268     return 0;\r
269 }\r
270 \r
271 static void closeconnection(gs_int32_t x) {\r
272     if (connectionhash[x].used==1)  {\r
273         close(connectionhash[x].socket);\r
274         connectionhash[x].used=0;\r
275     }\r
276 }\r
277 \r
278 static gs_retval_t writeall(gs_int32_t socket, void * b, gs_int32_t sz) {\r
279     gs_int32_t rv;\r
280     gs_sp_t  buf = (gs_sp_t )b;\r
281     gs_int32_t res=sz;\r
282     while(sz>0) {\r
283         if ((rv=write(socket,buf,sz))<0) {\r
284             if (errno == EINTR)\r
285                 continue;\r
286             else if (rv == EAGAIN || rv == EWOULDBLOCK) // CHECK THIS XXXOS\r
287                 return 0;\r
288             else\r
289                 return -1;\r
290         }\r
291         sz-=rv;\r
292         buf+=rv;\r
293     }\r
294     return res;\r
295 }\r
296 \r
297 static gs_retval_t msg_send(FTAID id, gs_sp_t  buf, gs_uint32_t len, gs_uint32_t block) {\r
298     struct sockaddr_in sin;\r
299     gs_int32_t x;\r
300     gs_int32_t u;\r
301     gs_int32_t sz;\r
302     gs_int32_t ret;\r
303     \r
304 try_send_again:\r
305     u=-1;\r
306     for(x=0;x<SOCKET_HASH_SZ;x++) {\r
307         if (connectionhash[x].used==1) {\r
308             if ((connectionhash[x].remoteid.ip==id.ip)\r
309                 && (connectionhash[x].remoteid.port==id.port)) {\r
310                 sz=htonl(len);\r
311                 if (block==0) {\r
312 #ifdef __linux__\r
313                     gs_int32_t datainbuffer;\r
314                     if (ioctl(connectionhash[x].socket,SIOCOUTQ,&datainbuffer)<1) {\r
315                         gslog(LOG_EMERG,\r
316                               "GSCMSGQ::error::could not determin free "\r
317                               "space in write buffer errno %u\n",errno);\r
318                         return -1;\r
319                     }\r
320                     if ((SOCK_BUF_SZ-datainbuffer) < (len+sizeof(gs_uint32_t))) {\r
321                         return 1;\r
322                     }\r
323                     \r
324 #else\r
325                     // low water mark in setsockoption is supported\r
326                     fd_set fs;\r
327                     gs_int32_t n;\r
328                     struct timeval tv;\r
329                     /* since we set the SNDLOWAT to MAXSZ+4 we know that if the write\r
330                      * select call returns with a 1 for that file descriptor at least that\r
331                      * much memory is available in the send buffer and we therefore\r
332                      * won't block sending\r
333                      */\r
334                     FD_ZERO(&fs);\r
335                     FD_SET(connectionhash[x].socket,&fs);\r
336                     n=connectionhash[x].socket+1;\r
337                     tv.tv_sec = 0;\r
338                     tv.tv_usec = 0;\r
339                     if(select(n,0,&fs,0,&tv)!=1) {\r
340                         return 1;\r
341                     }\r
342 #endif\r
343                 }\r
344 #ifdef PRINTMSG\r
345                 gslog(LOG_EMERG,"\twriting %u",ntohl(sz));\r
346 #endif\r
347                 ret = writeall(connectionhash[x].socket,&sz,sizeof(gs_uint32_t));\r
348                 if (!ret)\r
349                     return 1;\r
350                 else if (ret != sizeof(gs_uint32_t)) {\r
351                     gslog(LOG_EMERG,"GSCMSGQ::error::could not write length\n");\r
352                     return -1;\r
353                 }\r
354                 ret = writeall(connectionhash[x].socket,buf,len);\r
355                 if (!ret)\r
356                     return 1;\r
357                 else if (ret != len) {\r
358                     gslog(LOG_EMERG,"GSCMSGQ::error::could not write message\n");\r
359                     return -1;\r
360                 }\r
361 #ifdef PRINTMSG\r
362                 gslog(LOG_EMERG,"...done\n");\r
363 #endif\r
364                 return 0;\r
365             }\r
366         } else {\r
367             if (u==-1) { u=x; }\r
368         }\r
369     }\r
370     /* ok we don't have a connection make one */\r
371     if ((u>=SOCKET_HASH_SZ) || (u<0)) {\r
372         gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"\r
373               " TCP connection limit sending %d\n",u);\r
374         return -1;\r
375     }\r
376     connectionhash[u].remoteid=id;\r
377     connectionhash[u].remoteid.index=0;\r
378     connectionhash[u].remoteid.streamid=0;\r
379     sin.sin_family = AF_INET;\r
380 #ifdef __NetBSD__\r
381     sin.sin_len = sizeof(sin);\r
382 #endif\r
383     sin.sin_addr.s_addr = htonl(id.ip);\r
384     sin.sin_port = htons(id.port);\r
385     if ((connectionhash[u].socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {\r
386         gslog(LOG_EMERG,"GSCMSGQ::error::could not create socket\n");\r
387         return -1;\r
388     }\r
389     if (connect(connectionhash[u].socket,(struct sockaddr* )&sin,sizeof(sin)) < 0) {\r
390         gslog(LOG_EMERG,"GSCMSGQ::error::could not connect\n");\r
391         return -1;\r
392     }\r
393     sz=SOCK_BUF_SZ;\r
394     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDBUF,\r
395                    (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
396         gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer size\n");\r
397         return -1;\r
398     }\r
399     sz=SOCK_BUF_SZ;\r
400     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_RCVBUF,\r
401                    (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
402         gslog(LOG_EMERG,"GSCMSGQ::error::could not set receive buffer size\n");\r
403         return -1;\r
404     }\r
405 #ifndef __linux__\r
406     // Linux does not support low watermarks on sockets so we use ioctl SIOCOUTQ instead\r
407     sz=MAXSZ+4;\r
408     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_SNDLOWAT,\r
409                    (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
410         gslog(LOG_EMERG,"GSCMSGQ::error::could not set send buffer low watermark errorn %u\n",errno);\r
411         return -1;\r
412     }\r
413 #endif\r
414     sz=1;\r
415     if (setsockopt(connectionhash[u].socket, SOL_SOCKET, SO_KEEPALIVE,\r
416                    (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
417         gslog(LOG_EMERG,"GSCMSGQ::error::could not set keepalive\n");\r
418         return -1;\r
419     }\r
420     connectionhash[u].used=1;\r
421     goto try_send_again;\r
422     return -1;\r
423 }\r
424 \r
425 static gs_retval_t readall(gs_int32_t socket, void * b, gs_int32_t sz) {\r
426     gs_int32_t rv;\r
427     gs_sp_t  buf = (gs_sp_t )b;\r
428     gs_int32_t res=sz;\r
429     while(sz>0) {\r
430         if ((rv=read(socket,buf,sz))<0) {\r
431             if (errno == EINTR)\r
432                 continue;\r
433             gslog(LOG_EMERG,"read with error number %u \n",errno);\r
434             return -1;\r
435         }\r
436         if (rv==0) {\r
437             return 0;\r
438         }\r
439         sz-=rv;\r
440         buf+=rv;\r
441     }\r
442     return res;\r
443 }\r
444 \r
445 /* msg_recv return len if data is available -1 for a timeout and -2 for an error */\r
446 static gs_retval_t msg_recv(gs_sp_t  buf, gs_uint32_t buflen, gs_uint32_t block, gs_uint32_t check_sideque) {\r
447     struct sockaddr_in sin;\r
448     socklen_t sl;\r
449     fd_set fs;\r
450     gs_int32_t x,y;\r
451     gs_int32_t n;\r
452     gs_int32_t length;\r
453     struct timeval tv;\r
454     gs_int32_t sret;\r
455     static gs_int32_t last=0;\r
456     gs_uint32_t sz;\r
457     \r
458     if (check_sideque==1) {\r
459         if ((x=gscpipc_sidequeue_pop(buf, &length,buflen))==0) {\r
460             return length;\r
461         }\r
462         if (x==-2) {\r
463             gslog(LOG_EMERG,"GSCMSGQ::error::message in side queue to long\n");\r
464             return -2;\r
465         }\r
466     }\r
467     \r
468 read_again:\r
469     if (block==0) {\r
470         tv.tv_sec = 0;\r
471         tv.tv_usec = 0;\r
472     } else {\r
473         tv.tv_sec = 0;\r
474         tv.tv_usec = 100000;\r
475     }\r
476     FD_ZERO(&fs);\r
477     FD_SET(listen_socket,&fs);\r
478     n=listen_socket;\r
479     for(x=0;x<SOCKET_HASH_SZ;x++) {\r
480         if (connectionhash[x].used==1) {\r
481             FD_SET(connectionhash[x].socket,&fs);\r
482             if (n<connectionhash[x].socket) {\r
483                 n=connectionhash[x].socket;\r
484             }\r
485         }\r
486     }\r
487     n=n+1;\r
488     // now block\r
489     sret=select(n,&fs,0,0,&tv);\r
490     if ((sret<0) && (errno!=EINTR)) {\r
491         gslog(LOG_EMERG,"Select with error %u\n",errno);\r
492         return -2;\r
493     }\r
494     if (sret<=0) {\r
495         return -1;\r
496     }\r
497     if (FD_ISSET(listen_socket,&fs)) {\r
498         for (x=0;(x<SOCKET_HASH_SZ) && (connectionhash[x].used !=0) ;x++);\r
499         if (x>=SOCKET_HASH_SZ) {\r
500             gslog(LOG_EMERG,"GSCMSGQ::error::reached the maximum"\r
501                   "TCP connection limit accepting\n");\r
502             goto read_again;\r
503         }\r
504         sl=sizeof(sin);\r
505         if ((connectionhash[x].socket=accept(listen_socket,(struct sockaddr *)&(sin),&sl))\r
506                     < 0) {\r
507             gslog(LOG_EMERG,"GSCMSGQ::error::could not accept new connection\n");\r
508             goto read_again;\r
509         }\r
510         \r
511         sz=SOCK_BUF_SZ;\r
512         if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_SNDBUF,\r
513                        (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
514             fprintf(stderr,"GSCMSGQ::error::could not set send buffer size\n");\r
515             return -1;\r
516         }\r
517         sz=SOCK_BUF_SZ;\r
518         if (setsockopt(connectionhash[x].socket, SOL_SOCKET, SO_RCVBUF,\r
519                        (gs_sp_t )&sz, sizeof(sz)) != 0) {\r
520             fprintf(stderr,"GSCMSGQ::error::could not set receive buffer size\n");\r
521             return -1;\r
522         }\r
523         \r
524         sl=sizeof(sin);\r
525         if (getpeername(connectionhash[x].socket,(struct sockaddr *)&(sin),&sl)<0) {\r
526             gslog(LOG_EMERG,"GSCMSGQ::error::could not get peername on new connection\n");\r
527             close(connectionhash[x].socket);\r
528             goto read_again;\r
529         }\r
530         connectionhash[x].remoteid.ip=ntohl(sin.sin_addr.s_addr);\r
531         connectionhash[x].remoteid.port=ntohs(sin.sin_port);\r
532         connectionhash[x].remoteid.index=0;\r
533         connectionhash[x].remoteid.streamid=0;\r
534         connectionhash[x].used=1;\r
535 #ifdef PRINTMSG\r
536         gslog(LOG_EMERG,"Accepted from %u\n",connectionhash[x].remoteid.port);\r
537 #endif\r
538         goto read_again;\r
539     }\r
540     for(x=0;x<SOCKET_HASH_SZ;x++) {\r
541                 last=(last+1)%SOCKET_HASH_SZ;\r
542         if ((connectionhash[last].used==1) && (FD_ISSET(connectionhash[last].socket,&fs))){\r
543             gs_int32_t rsz;\r
544 #ifdef PRINTMSG\r
545             gslog(LOG_EMERG,"reading sret:%d block:%u...",sret,block);\r
546 #endif\r
547             if ((rsz=readall(connectionhash[last].socket,&length,sizeof(gs_uint32_t)))!=sizeof(gs_uint32_t)) {\r
548                 closeconnection(last);\r
549                 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing %u res %d\n",\r
550                       connectionhash[last].remoteid.port,rsz);\r
551                 continue;\r
552             }\r
553             length=ntohl(length);\r
554             if (buflen<length) {\r
555                 gs_int8_t d;\r
556                 gslog(LOG_EMERG,"GSCMSGQ::error::message to long (%u) for receive buffer (%u)\n",length,\r
557                       buflen);\r
558                 /* remove the data */\r
559                 for(y=0;y<length;y++)\r
560                     if (readall(connectionhash[last].socket,&d,1)!=1) {\r
561                         closeconnection(last);\r
562                         gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing for receive buffer mismatch\n");\r
563                         return -2;\r
564                     }\r
565                 return -2;\r
566             }\r
567             if (readall(connectionhash[last].socket,buf,length)!=length) {\r
568                 closeconnection(last);\r
569                 gslog(LOG_EMERG,"GSCMSGQ::error::connection is closing on data read\n");\r
570                 continue;\r
571             }\r
572 #ifdef PRINTMSG\r
573             gslog(LOG_EMERG,"reading done\n");\r
574 #endif\r
575             return length;\r
576         }\r
577     }\r
578     return -1;\r
579 }\r
580 \r
581 static gs_retval_t send_nack(FTAID recid) {\r
582     struct internal_message i;\r
583     i.im.receiver = recid;\r
584     i.im.sender = myid;\r
585     i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
586     i.im.size = sizeof(struct internal_message);\r
587     i.lowlevelop = LOWLEVELOP_NACK;\r
588     if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {\r
589         return 0;\r
590     } else {\r
591         return -1;\r
592     }\r
593     return -1;\r
594 }\r
595 \r
596 static gs_retval_t send_ack(FTAID recid) {\r
597     struct internal_message i;\r
598     i.im.receiver = recid;\r
599     i.im.sender = myid;\r
600     i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
601     i.im.size = sizeof(struct internal_message);\r
602     i.lowlevelop = LOWLEVELOP_ACK;\r
603     if (msg_send(recid,(gs_sp_t )&i,i.im.size,1) == 0) {\r
604         return 0;\r
605     } else {\r
606         return -1;\r
607     }\r
608     return -1;\r
609 }\r
610 \r
611 static gs_retval_t wait_for_lowlevel_ack() {\r
612     gs_int8_t b[MAXMSGSZ];\r
613     struct internal_message * i;\r
614     gs_int32_t res;\r
615     i=(struct internal_message *)b;\r
616     \r
617     /* this is bussy waiting if there is another\r
618      message waiting to be processed so make sure it is only\r
619      used where bussy waiting is OK. If that becomes\r
620      a probelm add a local queue\r
621      */\r
622     \r
623     while( 1==1) {\r
624         if ((res=msg_recv(b, MAXMSGSZ,1,0))>0) {\r
625             if ((i->im.operation == RESERVED_FOR_LOW_LEVEL)\r
626                 && (( i->lowlevelop ==  LOWLEVELOP_ACK)\r
627                     || ( i->lowlevelop ==  LOWLEVELOP_NACK))) {\r
628                     if ( i->lowlevelop ==  LOWLEVELOP_ACK) {\r
629                         return 0;\r
630                     } else {\r
631                         return 1;\r
632                     }\r
633                 }\r
634                         if (i->lowlevelop==LOWLEVELOP_REMOTE_TUPLE) {\r
635                                 struct internal_remote_tuple * it;\r
636                                 struct shmlistentry * s;\r
637                                 it=(struct internal_remote_tuple *)b;\r
638                 \r
639                                 if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {\r
640 #ifdef PRINTMSG\r
641                                         gslog(LOG_EMERG,"Received remote ringbuf message "\r
642                           "for message of size %u\n",\r
643                           it->size,it->im.size);\r
644 #endif\r
645 #ifdef BLOCKRINGBUFFER\r
646                                         while (SPACETOWRITE(s->buf)==0) {\r
647                                                 usleep(1000);\r
648                                                 gslog(LOG_ERR,"Dead in the water we can't "\r
649                               "drain the ringbuffer we wait for.");\r
650                                         }\r
651                                         memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
652                                         ADVANCEWRITE(s->buf);\r
653 #else\r
654                                         if (SPACETOWRITE(s->buf)) {\r
655                                                 memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
656                                                 ADVANCEWRITE(s->buf);\r
657                                         } else {\r
658                         //                                              gslog(LOG_EMERG,"r+");\r
659                                         }\r
660 #endif\r
661                                 } else {\r
662                                         gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");\r
663                                 }\r
664                         } else {\r
665                                 gscpipc_sidequeue_append(b, res);\r
666                         }\r
667         }  else {\r
668             if (res < -1) {\r
669                 /* got an error here */\r
670                 gslog(LOG_EMERG,"hostlib::error::received error "\r
671                       "during wait for low level ack\n");\r
672                 return -1;\r
673             }\r
674         }\r
675     }\r
676     /* never reached */\r
677     return -1;\r
678 }\r
679 \r
680 void shmlist_drain_remote()\r
681 {\r
682     gs_int32_t x;\r
683     gs_int8_t buf[MAXSZ];\r
684     struct internal_remote_tuple * it;\r
685     it = (struct internal_remote_tuple *) buf;\r
686     for (x=0; x<shmlistlen; x++) {\r
687         if ((shmlist[x].msgid.ip != myid.ip)&& (shmlist[x].type==SHM_SEND)) {\r
688             while (UNREAD(shmlist[x].buf)) {\r
689                 it->im.receiver = shmlist[x].msgid;\r
690                 it->im.sender = myid;\r
691                 it->im.operation = RESERVED_FOR_LOW_LEVEL;\r
692                 it->lowlevelop = LOWLEVELOP_REMOTE_TUPLE;\r
693                 it->size=UP64(CURREAD(shmlist[x].buf)->sz)+sizeof(struct tuple)-1;\r
694                 it->im.size = sizeof(struct internal_remote_tuple)-4+it->size;\r
695                 memcpy(&(it->data[0]),CURREAD(shmlist[x].buf),it->size);\r
696 #ifdef PRINTMSG\r
697                 gslog(LOG_EMERG,"Sending remote ringbuffer message of size %u %u\n",\r
698                       it->size, it->im.size);\r
699 #endif\r
700                 if (msg_send(shmlist[x].msgid,(gs_sp_t )it,it->im.size,0)==1) {\r
701                     break;\r
702                 }\r
703                 ADVANCEREAD(shmlist[x].buf);\r
704             }\r
705         }\r
706     }\r
707 }\r
708 \r
709 /*\r
710  *used to contact the clearinghouse process returns the MSGID of\r
711  * the current process\r
712  */\r
713 gs_retval_t gscpipc_init(gs_int32_t clearinghouse)\r
714 {\r
715     struct internal_message i;\r
716     key_t msqtoken=0;\r
717     gs_int32_t x;\r
718     endpoint gshub;\r
719     endpoint tmpclearinghouse;\r
720     \r
721     /* make sure priveleges can be set */\r
722     umask(0);\r
723     \r
724     clearinghouseftaid.index=0;\r
725     clearinghouseftaid.streamid=0;\r
726     \r
727     if (get_hub(&gshub)!=0) {\r
728         gslog(LOG_EMERG,"hostlib::error::could get hub\n");\r
729         return -1;\r
730     }\r
731     \r
732     if (clearinghouse!=0) {\r
733         // This is the clearinghouse\r
734         gs_int8_t buf[MAXMSGSZ];\r
735         \r
736         if (msg_init(1)<0) {\r
737             gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");\r
738             return -1;\r
739         }\r
740         \r
741         clearinghouseftaid.ip=myid.ip;\r
742         clearinghouseftaid.port=myid.port;\r
743         \r
744         tmpclearinghouse.ip=htonl(clearinghouseftaid.ip);\r
745         tmpclearinghouse.port=htons(clearinghouseftaid.port);\r
746         \r
747         if (set_instance(gshub, get_instance_name(), tmpclearinghouse)!=0) {\r
748             gslog(LOG_EMERG,"hostlib::error::clearinghouse could not set instance");\r
749             return -1;\r
750         }\r
751         \r
752         return 0;\r
753         \r
754     } else {\r
755         // This is an lfta/hfta/app\r
756         gs_int32_t res;\r
757         \r
758         if (get_instance(gshub,get_instance_name(),&tmpclearinghouse,1) < 0) {\r
759             gslog(LOG_EMERG,"hostlib::error::could not find clearinghouse\n");\r
760             return -1;\r
761         }\r
762         \r
763         clearinghouseftaid.ip=ntohl(tmpclearinghouse.ip);\r
764         clearinghouseftaid.port=ntohs(tmpclearinghouse.port);\r
765         \r
766         \r
767         if (msg_init(0)<0) {\r
768             gslog(LOG_EMERG,"hostlib::error::could not init msgq\n");\r
769             return -1;\r
770         }\r
771         \r
772         i.im.receiver = clearinghouseftaid;\r
773         i.im.sender = myid;\r
774         i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
775         i.im.size = sizeof(struct internal_message);\r
776         i.lowlevelop = LOWLEVELOP_REGISTER;\r
777 #ifdef PRINTMSG\r
778         i.im.pmsgid=pmsgid;\r
779         pmsgid++;\r
780         gslog(LOG_EMERG,"send a message (%d.%u) to %u with op "\r
781               "%u with size %u\n",\r
782               i.im.pmsgid,i.im.receiver.port,i.im.operation,\r
783               i.im.size);\r
784 #endif\r
785         if ((res=msg_send(clearinghouseftaid,(gs_sp_t )&i,i.im.size,1)) == 0) {\r
786             /* we can wait her for an ack since nobody should know\r
787              about us yet */\r
788         init_read_again:\r
789             if ((res=msg_recv((gs_sp_t )&i,\r
790                               sizeof(struct internal_message),\r
791                               1,0))>=0) {\r
792                 if (i.lowlevelop == LOWLEVELOP_ACK) {\r
793                     return 0;\r
794                 } else {\r
795                     gslog(LOG_EMERG,"hostlib::error::received unexpected message "\r
796                           "during initalization\n");\r
797                     return -1;\r
798                 }\r
799             } else {\r
800                 if (res<-1) {\r
801                     /* got an error here */\r
802                     gslog(LOG_EMERG,"hostlib::error::received error message "\r
803                           "during initalization\n");\r
804                     return -1;\r
805                 } else {\r
806                     goto init_read_again;\r
807                 }\r
808             }\r
809         }\r
810         gslog(LOG_EMERG,"hostlib::error::could not send on msgqueue\n");\r
811         return -1;\r
812     }\r
813     return 0;\r
814 }\r
815 \r
816 \r
817 static gs_retval_t gscpdetachshm(FTAID target)\r
818 {\r
819     struct internal_message i;\r
820     \r
821     i.im.receiver = target;\r
822     i.im.sender = myid;\r
823     i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
824     i.im.size = sizeof(struct internal_message);\r
825     i.lowlevelop = LOWLEVELOP_SHM_FREE;\r
826     if (msg_send(target,(gs_sp_t )&i,i.im.size,1)<0) {\r
827         /* no reason to wait here won't be acked anyway */\r
828         return -1;\r
829     }\r
830     return 0;\r
831 }\r
832 \r
833 static gs_retval_t gscpdetachsocket(FTAID target)\r
834 {\r
835     struct internal_message i;\r
836     \r
837     i.im.receiver = target;\r
838     i.im.sender = myid;\r
839     i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
840     i.im.size = sizeof(struct internal_message);\r
841     i.lowlevelop = LOWLEVELOP_UNREGISTER;\r
842     if (msg_send(target,(gs_sp_t )&i,i.im.size,1) <0) {\r
843         /* no reason to wait here won't be acked anyway */\r
844         return -1;\r
845     }\r
846     return -1;\r
847 }\r
848 \r
849 \r
850 /* used to disassociate process from clearinghouse */\r
851 gs_retval_t gscpipc_free()\r
852 {\r
853     gs_int32_t x;\r
854     /* XXX OS if this function is called when there are still\r
855      subscribed FTAs for this process the clearinghouse will\r
856      crash\r
857      */\r
858     \r
859     if (clearinghouse!=0) {\r
860         return 0;\r
861     } else {\r
862         gs_int32_t x;\r
863         for (x=0; x<shmlistlen; x++) {\r
864             if (shmlist[x].type==SHM_RECV) {\r
865                 gscpdetachshm(shmlist[x].msgid);\r
866                 if (shmdt((gs_sp_t )shmlist[x].buf)!=0) {\r
867                     gslog(LOG_EMERG,"hostlib::error::could not "\r
868                           "detach shared memory\n");\r
869                 }\r
870             } else {\r
871                 gslog(LOG_EMERG,"hostlib::error::porccess freed while still "\r
872                       "attached to sending shared memory\n");\r
873             }\r
874         }\r
875     }\r
876     /* remove connection */\r
877     for(x=0;x<SOCKET_HASH_SZ;x++) {\r
878         if (connectionhash[x].used==1) {\r
879             //XXX detach does not work due to interleaved messages\r
880             // gscpdetachsocket(connectionhash[x].remoteid);\r
881             close(connectionhash[x].socket);\r
882         }\r
883     }\r
884     close(listen_socket);\r
885     return 0;\r
886 }\r
887 \r
888 /* returns MSGID of current process */\r
889 FTAID gscpipc_getftaid()\r
890 {\r
891     return myid;\r
892 }\r
893 \r
894 \r
895 /* sends a message to a process */\r
896 gs_retval_t gscpipc_send(FTAID f, gs_int32_t operation, gs_sp_t  buf, gs_int32_t length, gs_int32_t block)\r
897 {\r
898     gs_int8_t b[MAXMSGSZ];\r
899     struct ipc_message * i;\r
900     struct shmlistentry * s;\r
901     if (length > MAXMSGSZ) {\r
902         gslog(LOG_EMERG,"hostlib::error::gscpipc_send msg to long\n");\r
903         return -1;\r
904     }\r
905     i = (struct ipc_message *) b;\r
906     i->receiver=f;\r
907     i->sender=myid;\r
908     i->operation=operation;\r
909     i->size=length+sizeof(struct ipc_message);\r
910     memcpy(&i->data[0],buf,length);\r
911 #ifdef PRINTMSG\r
912     i->pmsgid=pmsgid;\r
913     pmsgid++;\r
914     gslog(LOG_EMERG,"send a message (%d.%u) to %u with op %u with size %u\n",\r
915           i->pmsgid,f.ip,i->receiver.ip,i->operation,i->size,length);\r
916 #endif\r
917     if ((s=shmlist_find(f, SHM_RECV))!=0) {\r
918         // set the hint in the ringbuffer that there is something on the shared memory queue\r
919         s->buf->mqhint=1;\r
920     }\r
921     if (msg_send(f,(gs_sp_t )i,i->size,block) < 0) {\r
922         gslog(LOG_EMERG,"hostlib::error::gscpipc_send msgsnd failed errno (%u)\n",errno);\r
923         return -1;\r
924     }\r
925     return 0;\r
926 }\r
927 \r
928 /* retrieve a message buf has to be at least of size MAXMSGSZ*/\r
929 gs_retval_t gscpipc_read(FTAID * f, gs_int32_t * operation, gs_sp_t  buf, gs_int32_t * size, gs_int32_t block)\r
930 {\r
931     gs_int32_t w;\r
932     gs_int32_t x;\r
933     gs_int8_t b[MAXSZ];\r
934         gs_int32_t y;\r
935     \r
936     struct internal_message * i;\r
937     struct internal_remote_tuple * it;\r
938     gs_int32_t length;\r
939     \r
940     struct shmlistentry * s;\r
941     \r
942     i=(struct internal_message *)b;\r
943     it=(struct internal_remote_tuple *)b;\r
944     \r
945     for(y=0;(y < 10) || (block==1);y++) {\r
946         shmlist_drain_remote();\r
947                 length=msg_recv((gs_sp_t )b, MAXMSGSZ, block,1);\r
948                 if (length < -1) {\r
949                         /* problem */\r
950                         return -1;\r
951                 }\r
952                 if (length < 0) {\r
953                         /* we are nonblocking and have nothing to do */\r
954             if (block==1) {\r
955                 // we are expected to block for ever if it is 0 or 2 we return\r
956                 continue;\r
957             }\r
958                         return 0;\r
959                 }\r
960 #ifdef PRINTMSG\r
961                 gslog(LOG_EMERG,"got a message (%d.%u) from %u with op %u with size %u\n",\r
962               i->im.pmsgid,  i->im.sender, i->im.sender,i->im.operation,i->im.size);\r
963 #endif\r
964                 if (length <0) {\r
965                         gslog(LOG_EMERG,"gscpipc::Error receiving message %u\n",errno);\r
966                         return -1;\r
967                 }\r
968                 if (i->im.operation != RESERVED_FOR_LOW_LEVEL) {\r
969                         memcpy(buf,&(i->im.data[0]),i->im.size);\r
970                         *size=i->im.size-sizeof(struct ipc_message);\r
971                         *operation=i->im.operation;\r
972                         *f=i->im.sender;\r
973                         if ((s=shmlist_find(*f, SHM_SEND))!=0) {\r
974                                 // clear the hint in the ringbuffer to indicate we got the message\r
975                 s->buf->mqhint=0;\r
976                         }\r
977                         return 1;\r
978                 }\r
979                 switch (i->lowlevelop) {\r
980             case LOWLEVELOP_REGISTER:\r
981                 /* this should only get called if the process is the clearinghouse */\r
982 #ifdef PRINTMSG\r
983                 gslog(LOG_EMERG,"request to register %u\n",i->im.sender.port);\r
984 #endif\r
985                 send_ack(i->im.sender);\r
986                 break;\r
987             case LOWLEVELOP_UNREGISTER:\r
988 #ifdef PRINTMSG\r
989                 gslog(LOG_EMERG,"request to unregister %u\n",i->im.sender.port);\r
990 #endif    /* remove connection */\r
991                 for(x=0;x<SOCKET_HASH_SZ;x++) {\r
992                     if ( (connectionhash[x].used==1)\r
993                         && (connectionhash[x].remoteid.ip==i->im.sender.ip)\r
994                         && (connectionhash[x].remoteid.port==i->im.sender.port)) {\r
995                         gslog(LOG_EMERG,"Close by remote request %u\n",\r
996                               connectionhash[x].remoteid.port);\r
997                         // XXX closed when the other process dies\r
998                         // can't close it yet since we might have\r
999                         // some more messages\r
1000                         // close(connectionhash[x].socket);\r
1001                         connectionhash[x].used=0;\r
1002                     }\r
1003                 }\r
1004                 break;\r
1005             case LOWLEVELOP_SHM_REGISTER:\r
1006             {\r
1007                 gs_int32_t shmid;\r
1008                 struct ringbuf * r;\r
1009                 struct shmid_ds sms;\r
1010 #ifdef PRINTMSG\r
1011                 gslog(LOG_EMERG,"request to get shm %u token 0x%x size %u\n",\r
1012                       i->im.sender.port,\r
1013                       i->shmtoken,i->shmsz);\r
1014 #endif\r
1015                 if ((shmid = shmget(i->shmtoken,i->shmsz,IPC_RALL|IPC_WALL))!=-1) {\r
1016                     if (((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0)))==(gs_p_t)(-1)) {\r
1017                         gslog(LOG_EMERG,"hostlib::error::could not attach send shm errno (%u)\n",errno);\r
1018                         send_nack(i->im.sender);\r
1019                     } else {\r
1020                         // Make sure all the momory gets mapped now\r
1021                                         for(x=0;x<length;x=x+1024) {\r
1022                             ((gs_uint8_t *) r)[x]=0;\r
1023                                         }\r
1024                         \r
1025 #ifdef PRINTMSG\r
1026                         gslog(LOG_EMERG,"Got a ring buffer at address %p (%u %u %u %u)\n"\r
1027                               ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);\r
1028 #endif\r
1029                         if (shmlist_add(i->im.sender,SHM_SEND,i->shmtoken,\r
1030                                         shmid,r,i->shmsz)<0) {\r
1031                             shmdt((gs_sp_t )r);\r
1032                             shmctl(shmid,IPC_RMID,&sms);\r
1033                             gslog(LOG_EMERG,"hostlib::error::could not add shm internally\n");\r
1034                             send_nack(i->im.sender);\r
1035                         } else {\r
1036                             send_ack(i->im.sender);\r
1037                         }\r
1038                     }\r
1039                 } else {\r
1040                     gslog(LOG_EMERG,"hostlib::error::could not access send shm %u\n",errno);\r
1041                     send_nack(i->im.sender);\r
1042                 }\r
1043             }\r
1044                 break;\r
1045             case LOWLEVELOP_SHM_REMOTE_REGISTER:\r
1046             {\r
1047                 gs_int32_t shmid;\r
1048                 struct ringbuf * r;\r
1049                 struct shmid_ds sms;\r
1050 #ifdef PRINTMSG\r
1051                 gslog(LOG_EMERG,"request to get remote shm %u  size %u\n",\r
1052                       i->im.sender.port,i->shmsz);\r
1053 #endif\r
1054                 if ((r=(struct ringbuf *)malloc(i->shmsz))==0) {\r
1055                     gslog(LOG_EMERG,"hostlib::error::could not allocat send remote shm errno (%u)\n",errno);\r
1056                     send_nack(i->im.sender);\r
1057                 } else {\r
1058                     // make sure all the memory gets mapped now\r
1059                     for(x=0;x<length;x=x+1024) {\r
1060                         ((gs_uint8_t *) r)[x]=0;\r
1061                     }\r
1062                     r->reader=0;\r
1063                     r->writer=0;\r
1064                     r->length=i->shmsz;\r
1065                     r->end= i->shmsz-MAXTUPLESZ;\r
1066                     r->destid=i->im.receiver;\r
1067                     r->srcid=i->im.sender;\r
1068 #ifdef PRINTMSG\r
1069                     gslog(LOG_EMERG,"Got a remote ring buffer at address %p (%u %u %u %u)\n"\r
1070                           ,(void *)r,r->reader,r->writer,r->length,i->shmtoken);\r
1071 #endif\r
1072                     if (shmlist_add(i->im.sender,SHM_SEND,0,\r
1073                                     0,r,i->shmsz)<0) {\r
1074                         gslog(LOG_EMERG,"hostlib::error::could not add remote shm internally\n");\r
1075                         send_nack(i->im.sender);\r
1076                     } else {\r
1077                         send_ack(i->im.sender);\r
1078                     }\r
1079                 }\r
1080             }\r
1081                 break;\r
1082             case LOWLEVELOP_SHM_FREE:\r
1083             {\r
1084                 struct shmlistentry * sm;\r
1085                 struct shmid_ds sms;\r
1086 #ifdef PRINTMSG\r
1087                 gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);\r
1088 #endif\r
1089                 if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {\r
1090 #ifdef PRINTMSG\r
1091                     gslog(LOG_EMERG,"freeing %u",sm->shmid);\r
1092 #endif\r
1093                     shmdt((gs_sp_t )sm->buf);\r
1094                     shmctl(sm->shmid,IPC_RMID,&sms);\r
1095                     shmlist_rm(i->im.sender,SHM_SEND);\r
1096                 }\r
1097             }\r
1098                 break;\r
1099             case LOWLEVELOP_SHM_REMOTE_FREE:\r
1100             {\r
1101                 struct shmlistentry * sm;\r
1102                 struct shmid_ds sms;\r
1103 #ifdef PRINTMSG\r
1104                 gslog(LOG_EMERG,"request to free shm %u\n",i->im.sender);\r
1105 #endif\r
1106                 if ((sm=shmlist_find(i->im.sender,SHM_SEND)) !=0) {\r
1107 #ifdef PRINTMSG\r
1108                     gslog(LOG_EMERG,"freeing %u",sm->shmid);\r
1109 #endif\r
1110                     free((gs_sp_t ) sm->buf);\r
1111                     shmlist_rm(i->im.sender,SHM_SEND);\r
1112                 }\r
1113             }\r
1114                 break;\r
1115             case LOWLEVELOP_REMOTE_TUPLE:\r
1116             {\r
1117                 if ((s=shmlist_find(it->im.sender, SHM_RECV))!=0) {\r
1118 #ifdef PRINTMSG\r
1119                     gslog(LOG_EMERG,"Received remote ringbuf message "\r
1120                           "for message of size %u\n",\r
1121                           it->size,it->im.size);\r
1122 #endif\r
1123 #ifdef BLOCKRINGBUFFER\r
1124                     while (SPACETOWRITE(s->buf)==0) {\r
1125                         usleep(1000);\r
1126                         gslog(LOG_EMERG,"Dead in the water we can't drain the ringbuffer we wait for.");\r
1127                     }\r
1128                     memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
1129                     ADVANCEWRITE(s->buf);\r
1130 #else\r
1131                     if (SPACETOWRITE(s->buf)) {\r
1132                         memcpy(CURWRITE(s->buf),&(it->data[0]),it->size);\r
1133                         ADVANCEWRITE(s->buf);\r
1134                     } else {\r
1135                         //                   gslog(LOG_EMERG,"r+");\r
1136                     }\r
1137 #endif\r
1138                 } else {\r
1139                     gslog(LOG_EMERG,"Received tuple on msq for none existing remote ringbuffer\n");\r
1140                 }\r
1141             }\r
1142                 break;\r
1143             default:\r
1144                 gslog(LOG_EMERG,"hostlib::error::unexpected message received\n");\r
1145                 return -1;\r
1146                 }\r
1147     }\r
1148     return 0;\r
1149 }\r
1150 \r
1151 /* allocate a ringbuffer which allows receiving data from\r
1152  * the other process. returns 0 if didn't succeed and\r
1153  * returns an existing buffer if it exists */\r
1154 struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t length)\r
1155 {\r
1156     struct shmid_ds sms;\r
1157     struct shmlistentry * se;\r
1158     gs_int8_t keybuf[1024];\r
1159     key_t shmtoken=0;\r
1160     gs_int32_t shmid=0;\r
1161     struct ringbuf * r;\r
1162     struct internal_message i;\r
1163     gs_int32_t x;\r
1164     \r
1165     se = shmlist_find(f, SHM_RECV);\r
1166     if (se) {\r
1167         return se->buf;\r
1168     }\r
1169     \r
1170     if (length<(4*MAXTUPLESZ)) {\r
1171         gslog(LOG_EMERG,\r
1172               "ERROR:buffersize in gscpipc_createshm  has to be "\r
1173               "at least %u Bytes long\n",\r
1174               4*MAXTUPLESZ);\r
1175         return 0;\r
1176     }\r
1177     \r
1178     if (myid.ip == f.ip) {\r
1179         sprintf(keybuf,"/tmp/gscpapp_%u_%u.pid",myid.port,f.port);\r
1180         if ((x=open(keybuf,O_CREAT,S_IRWXU|S_IRWXG|S_IRWXO)) ==-1)  {\r
1181             gslog(LOG_EMERG,"hostlib::error::could not create shared memory id\n");\r
1182             return 0;\r
1183         }\r
1184         close(x);\r
1185         \r
1186         if ((shmtoken = ftok(keybuf,SHMTYPE))==-1) {\r
1187             gslog(LOG_EMERG,"hostlib::error::could not determin shm receive queue id\n");\r
1188             return 0;\r
1189         }\r
1190         \r
1191         if ((gs_int32_t)(shmid = shmget(shmtoken,length,IPC_RALL|IPC_WALL|\r
1192                                         IPC_CREAT|IPC_EXCL))==-1) {\r
1193             gslog(LOG_EMERG,"hostlib::error::could not access receive shm %u\n",errno);\r
1194             return 0;\r
1195         }\r
1196         if ((gs_p_t)(r=(struct ringbuf *)shmat(shmid,0,0))==(gs_p_t)(-1)) {\r
1197             gslog(LOG_EMERG,"hostlib::error::could not attach receive shm\n");\r
1198             return 0;\r
1199         }\r
1200         /* touch all memory once to map/reserve it now */\r
1201         for(x=0;x<length;x=x+1024) {\r
1202             ((gs_uint8_t *) r)[x]=0;\r
1203         }\r
1204         r->reader=0;\r
1205         r->writer=0;\r
1206         r->length=length;\r
1207         r->end= r->length-MAXTUPLESZ;\r
1208         r->destid=f;\r
1209         r->srcid=myid;\r
1210         i.im.receiver = f;\r
1211         i.im.sender = myid;\r
1212         i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
1213         i.im.size = sizeof(struct internal_message);\r
1214         i.lowlevelop = LOWLEVELOP_SHM_REGISTER;\r
1215         i.shmtoken = shmtoken;\r
1216         i.shmsz=length;\r
1217         if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {\r
1218             if (wait_for_lowlevel_ack()<0) {\r
1219                 shmdt((gs_sp_t )r);\r
1220                 shmctl(shmid,IPC_RMID,&sms);\r
1221                 return 0;\r
1222             }\r
1223         }\r
1224         shmctl(shmid,IPC_RMID,&sms); /* this will destroy the shm automatically after all processes detach */  \r
1225         if (shmlist_add(f, SHM_RECV, shmtoken, \r
1226                         shmid, r, length) <0) {\r
1227             shmdt((gs_sp_t )r);\r
1228             shmctl(shmid,IPC_RMID,&sms);\r
1229             i.im.receiver = f;\r
1230             i.im.sender = myid;\r
1231             i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
1232             i.im.size = sizeof(struct internal_message);\r
1233             i.lowlevelop = LOWLEVELOP_SHM_FREE;\r
1234             i.shmtoken = shmtoken;\r
1235             msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
1236             return 0;\r
1237         }\r
1238     } else {\r
1239         /* remote shared memory */\r
1240         if ((r=(struct ringbuf *)malloc(length))==0) {\r
1241             gslog(LOG_EMERG,"hostlib::error::could not malloc local part of remote ringbuffer\n");\r
1242             return 0;\r
1243         }\r
1244         /* touch all memory once to map/reserve it now */\r
1245         for(x=0;x<length;x=x+1024) {\r
1246             ((gs_uint8_t *) r)[x]=0;\r
1247         }\r
1248         r->reader=0;\r
1249         r->writer=0;\r
1250         r->length=length;\r
1251         r->end= r->length-MAXTUPLESZ;\r
1252         r->destid=f;\r
1253         r->srcid=myid;\r
1254         i.im.receiver = f;\r
1255         i.im.sender = myid;\r
1256         i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
1257         i.im.size = sizeof(struct internal_message);\r
1258         i.lowlevelop = LOWLEVELOP_SHM_REMOTE_REGISTER;\r
1259         i.shmtoken = shmtoken;\r
1260         i.shmsz=length;\r
1261         if (msg_send(f,(gs_sp_t )&i,i.im.size,1) == 0) {\r
1262             if (wait_for_lowlevel_ack()<0) {\r
1263                 free(r);\r
1264                 return 0;\r
1265             }\r
1266         }\r
1267         if (shmlist_add(f, SHM_RECV, 0, \r
1268                         0, r, length) <0) {\r
1269             free(r);\r
1270             i.im.receiver = f;\r
1271             i.im.sender = myid;\r
1272             i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
1273             i.im.size = sizeof(struct internal_message);\r
1274             i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;\r
1275             msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
1276             return 0;\r
1277         }\r
1278     }\r
1279     return r;\r
1280         \r
1281 }\r
1282 \r
1283 /* finds a ringbuffer to send which was allocated by\r
1284  * gscpipc_creatshm and return 0 on an error */\r
1285 \r
1286 struct ringbuf * gscpipc_getshm(FTAID f) \r
1287 {\r
1288     struct shmlistentry * se;\r
1289     gs_int32_t recmsgid;\r
1290     se = shmlist_find(f, SHM_SEND);\r
1291     if (se) {\r
1292         return se->buf;\r
1293     }\r
1294     return 0;\r
1295 }     \r
1296 \r
1297 /* frees shared memory to a particular proccess identified\r
1298  * by MSGID\r
1299  */ \r
1300 gs_retval_t gscpipc_freeshm(FTAID f)\r
1301 {\r
1302     struct internal_message i;\r
1303     struct shmlistentry * sm;\r
1304     struct shmid_ds sms;\r
1305     if (myid.ip == f.ip) { \r
1306         if ((sm=shmlist_find(f, SHM_RECV)) <0) {\r
1307             shmdt((gs_sp_t )sm->buf);\r
1308             shmctl(sm->shmid,IPC_RMID,&sms);\r
1309             i.im.receiver = f;\r
1310             i.im.sender = myid;\r
1311             i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
1312             i.im.size = sizeof(struct internal_message);\r
1313             i.lowlevelop = LOWLEVELOP_SHM_FREE;\r
1314             i.shmtoken = sm->shmtoken;\r
1315             msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
1316             shmlist_rm(f, SHM_RECV);\r
1317         }\r
1318     } else {\r
1319         if ((sm=shmlist_find(f, SHM_RECV)) <0) {\r
1320             free((gs_sp_t )sm->buf);\r
1321             i.im.receiver = f;\r
1322             i.im.sender = myid;\r
1323             i.im.operation = RESERVED_FOR_LOW_LEVEL;\r
1324             i.im.size = sizeof(struct internal_message);\r
1325             i.lowlevelop = LOWLEVELOP_SHM_REMOTE_FREE;\r
1326             msg_send(f,(gs_sp_t )&i,i.im.size,1);\r
1327             shmlist_rm(f, SHM_RECV);\r
1328         }\r
1329     }    \r
1330     return 0;\r
1331 }\r
1332 \r
1333 gs_retval_t gscpipc_mqhint()\r
1334 {\r
1335     gs_int32_t x;\r
1336     for (x=0; x<shmlistlen; x++) {\r
1337         if (shmlist[x].type == SHM_SEND) {\r
1338             if (shmlist[x].buf->mqhint) {\r
1339                 return 1;\r
1340             }\r
1341         }\r
1342     }\r
1343     return 0;\r
1344 }\r