Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / include / gscpipc.h
index 44b79ea..5e8cf0a 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-/*
- * gscpipc.h:: defines the interface of the IPC channels used in
- * the GS-lite framework to communicate tuples between different
- * processes
- */
-
-#ifndef GSCPIPC_H
-#define GSCPIPC_H
-
-#include "gsconfig.h"
-#include "gstypes.h"
-#include "fta.h"
-
-#define RESERVED_FOR_LOW_LEVEL 0
-
-#define FTACALLBACK 1
-
-extern gs_uint64_t intupledrop;
-extern gs_uint64_t outtupledrop;
-extern gs_uint64_t intuple;
-extern gs_uint64_t outtuple;
-extern gs_uint64_t inbytes;
-extern gs_uint64_t outbytes;
-extern gs_uint64_t cycles;
-
-
-/* shared ringbuffer data structure used */
-
-#if defined(__sparc__) && defined(__sun__)
-#define ALIGN64
-#endif
-
-struct tuple {
-    FTAID f;
-    gs_int32_t  sz;
-    gs_uint32_t  next;
-#ifdef ALIGN64
-    gs_uint32_t  alignment;
-#endif
-    gs_int8_t  data[1];
-}__attribute__ ((__packed__));
-
-struct ringbuf {
-    gs_uint32_t   mqhint;
-    gs_uint32_t   writer;
-    gs_uint32_t   reader;
-    gs_uint32_t   end;
-    gs_int32_t  length;
-#ifdef ALIGN64
-    gs_uint32_t  alignment;
-#endif
-    FTAID  srcid;
-    FTAID  destid;
-    gs_int8_t  start[1];
-}__attribute__ ((__packed__));
-
-/* adds a buffer to the end of the sidequeue*/
-gs_retval_t sidequeue_append(FTAID ftaid, gs_sp_t buf, gs_int32_t length);
-
-/* removes a buffer from the top of the sidequeue*/
-gs_retval_t sidequeue_pop(FTAID * ftaid, gs_sp_t buf, gs_int32_t * length);
-
-/*
- *used to contact the clearinghouse process returns the MSGID of
- * the current process negative result indicates a problem
- */
-
-gs_retval_t  gscpipc_init(gs_int32_t  clearinghouse);
-
-/* used to disassociate process from clearinghouse */
-gs_retval_t  gscpipc_free();
-
-/* sends a message to a process */
-gs_retval_t  gscpipc_send(FTAID f, gs_int32_t  operation, gs_sp_t buf, gs_int32_t  length, gs_int32_t  block);
-
-/* retrieve a message buf has to be at least of size MAXMSGSZ returns 0
- if no message is available -1 on an error and 1 on sucess*/
-gs_retval_t  gscpipc_read(FTAID * f, gs_int32_t  * operation, gs_sp_t buf, gs_int32_t  * lenght, gs_int32_t  block);
-
-/* allocate a ringbuffer which allows receiving data from
- * the other process returns 0 if didn't succeed.
- * returns an existing buffer if it exists  and increments the refcnt*/
-
-struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t  length);
-
-/* finds a ringbuffer to send which was allocated by
- * gscpipc_creatshm and return 0 on an error */
-
-struct ringbuf * gscpipc_getshm(FTAID f);
-
-/* frees shared memory to a particular proccess identified
- * by ftaid if reference counter reaches 0
- */
-gs_retval_t  gscpipc_freeshm(FTAID f);
-
-
-/* returns true if on any sending ringbuffer the mqhint bit is true
- * can be used in lfta rts to indicate that the message queue should
- * be checked.
- */
-
-gs_retval_t  gscpipc_mqhint();
-
-/* Access macros for ringbuffer */
-
-#ifdef ALIGN64
-#define UP64(x) ((((x)+7)/8)*8)
-#else
-#define UP64(x) x
-#endif
-
-#define CURWRITE(buf)  ((struct tuple *)(&((buf)->start[(buf)->writer])))
-#define ADVANCEWRITE(buf)  CURWRITE(buf)->next=\
-((buf)->end > ((buf)->writer+UP64(CURWRITE(buf)->sz))+sizeof(struct tuple)-1)\
-? ((buf)->writer+UP64(CURWRITE(buf)->sz)+sizeof(struct tuple)-1) : 0; \
-(buf)->writer=CURWRITE(buf)->next;
-#define CURREAD(buf)  ((struct tuple *)(&((buf)->start[(buf)->reader])))
-#define ADVANCEREAD(buf) (buf)->reader=\
-CURREAD(buf)->next
-#define UNREAD(buf) ((buf)->reader != (buf)->writer)
-#define SPACETOWRITE(buf)   ((((buf)->reader <= (buf)->writer) \
-&& ((buf)->reader+(buf)->end-(buf)->writer) > MAXTUPLESZ)\
-|| (((buf)->reader > (buf)->writer) && (((buf)->reader-(buf)->writer)>MAXTUPLESZ)))
-#define HOWFULL(buf) (((((buf)->writer)-(buf)->reader)%((buf)->end)*1000)/((buf)->end))
-/* conservative estimate of how many tuples of size tuplesz fit in a ringbuffer */
-#define TUPLEFIT(buf,tuplesz) ((((((buf)->writer)-(buf)->reader)%((buf)->end))-MAXTUPLESZ)/ \
-((UP64(tuplesz))+sizeof(struct tuple)-1))
-#endif
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+/*\r
+ * gscpipc.h:: defines the interface of the IPC channels used in\r
+ * the GS-lite framework to communicate tuples between different\r
+ * processes\r
+ */\r
+\r
+#ifndef GSCPIPC_H\r
+#define GSCPIPC_H\r
+\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include "fta.h"\r
+\r
+#define RESERVED_FOR_LOW_LEVEL 0\r
+\r
+#define FTACALLBACK 1\r
+\r
+extern gs_uint64_t intupledrop;\r
+extern gs_uint64_t outtupledrop;\r
+extern gs_uint64_t intuple;\r
+extern gs_uint64_t outtuple;\r
+extern gs_uint64_t inbytes;\r
+extern gs_uint64_t outbytes;\r
+extern gs_uint64_t cycles;\r
+\r
+\r
+/* shared ringbuffer data structure used */\r
+\r
+#if defined(__sparc__) && defined(__sun__)\r
+#define ALIGN64\r
+#endif\r
+\r
+struct tuple {\r
+    FTAID f;\r
+    gs_int32_t  sz;\r
+    gs_uint32_t  next;\r
+#ifdef ALIGN64\r
+    gs_uint32_t  alignment;\r
+#endif\r
+    gs_int8_t  data[1];\r
+}__attribute__ ((__packed__));\r
+\r
+struct ringbuf {\r
+    gs_uint32_t   mqhint;\r
+    gs_uint32_t   writer;\r
+    gs_uint32_t   reader;\r
+    gs_uint32_t   end;\r
+    gs_int32_t  length;\r
+#ifdef ALIGN64\r
+    gs_uint32_t  alignment;\r
+#endif\r
+    FTAID  srcid;\r
+    FTAID  destid;\r
+    gs_int8_t  start[1];\r
+}__attribute__ ((__packed__));\r
+\r
+/* adds a buffer to the end of the sidequeue*/\r
+gs_retval_t sidequeue_append(FTAID ftaid, gs_sp_t buf, gs_int32_t length);\r
+\r
+/* removes a buffer from the top of the sidequeue*/\r
+gs_retval_t sidequeue_pop(FTAID * ftaid, gs_sp_t buf, gs_int32_t * length);\r
+\r
+/*\r
+ *used to contact the clearinghouse process returns the MSGID of\r
+ * the current process negative result indicates a problem\r
+ */\r
+\r
+gs_retval_t  gscpipc_init(gs_int32_t  clearinghouse);\r
+\r
+/* used to disassociate process from clearinghouse */\r
+gs_retval_t  gscpipc_free();\r
+\r
+/* sends a message to a process */\r
+gs_retval_t  gscpipc_send(FTAID f, gs_int32_t  operation, gs_sp_t buf, gs_int32_t  length, gs_int32_t  block);\r
+\r
+/* retrieve a message buf has to be at least of size MAXMSGSZ returns 0\r
+ if no message is available -1 on an error and 1 on sucess*/\r
+gs_retval_t  gscpipc_read(FTAID * f, gs_int32_t  * operation, gs_sp_t buf, gs_int32_t  * lenght, gs_int32_t  block);\r
+\r
+/* allocate a ringbuffer which allows receiving data from\r
+ * the other process returns 0 if didn't succeed.\r
+ * returns an existing buffer if it exists  and increments the refcnt*/\r
+\r
+struct ringbuf * gscpipc_createshm(FTAID f, gs_int32_t  length);\r
+\r
+/* finds a ringbuffer to send which was allocated by\r
+ * gscpipc_creatshm and return 0 on an error */\r
+\r
+struct ringbuf * gscpipc_getshm(FTAID f);\r
+\r
+/* frees shared memory to a particular proccess identified\r
+ * by ftaid if reference counter reaches 0\r
+ */\r
+gs_retval_t  gscpipc_freeshm(FTAID f);\r
+\r
+\r
+/* returns true if on any sending ringbuffer the mqhint bit is true\r
+ * can be used in lfta rts to indicate that the message queue should\r
+ * be checked.\r
+ */\r
+\r
+gs_retval_t  gscpipc_mqhint();\r
+\r
+/* Access macros for ringbuffer */\r
+\r
+#ifdef ALIGN64\r
+#define UP64(x) ((((x)+7)/8)*8)\r
+#else\r
+#define UP64(x) x\r
+#endif\r
+\r
+#define CURWRITE(buf)  ((struct tuple *)(&((buf)->start[(buf)->writer])))\r
+#define ADVANCEWRITE(buf)  CURWRITE(buf)->next=\\r
+((buf)->end > ((buf)->writer+UP64(CURWRITE(buf)->sz))+sizeof(struct tuple)-1)\\r
+? ((buf)->writer+UP64(CURWRITE(buf)->sz)+sizeof(struct tuple)-1) : 0; \\r
+(buf)->writer=CURWRITE(buf)->next;\r
+#define CURREAD(buf)  ((struct tuple *)(&((buf)->start[(buf)->reader])))\r
+#define ADVANCEREAD(buf) (buf)->reader=\\r
+CURREAD(buf)->next\r
+#define UNREAD(buf) ((buf)->reader != (buf)->writer)\r
+#define SPACETOWRITE(buf)   ((((buf)->reader <= (buf)->writer) \\r
+&& ((buf)->reader+(buf)->end-(buf)->writer) > MAXTUPLESZ)\\r
+|| (((buf)->reader > (buf)->writer) && (((buf)->reader-(buf)->writer)>MAXTUPLESZ)))\r
+#define HOWFULL(buf) (((((buf)->writer)-(buf)->reader)%((buf)->end)*1000)/((buf)->end))\r
+/* conservative estimate of how many tuples of size tuplesz fit in a ringbuffer */\r
+#define TUPLEFIT(buf,tuplesz) ((((((buf)->writer)-(buf)->reader)%((buf)->end))-MAXTUPLESZ)/ \\r
+((UP64(tuplesz))+sizeof(struct tuple)-1))\r
+#endif\r