-/* ------------------------------------------------\r
- Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
- \r
- http://www.apache.org/licenses/LICENSE-2.0\r
- \r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-#include <gscpipc.h>\r
-#include <ipcencoding.h>\r
-#include <stdlib.h>\r
-#include <stdio.h>\r
-#include <lapp.h>\r
-#include <lappregistries.h>\r
-#include <string.h>\r
-#include <unistd.h>\r
-#include <signal.h>\r
-#include <sys/mman.h>\r
-#include "rdtsc.h"\r
-// If POLLING is defined applications poll every 100 msec instead of blocking\r
-#define POLLING\r
-\r
-struct processtate curprocess = {0,0,0,255,0};\r
-struct FTAID clearinghouseftaid = {0,0,0,0};\r
-\r
-/*\r
- * sends the message passed in buf and waits for a result\r
- * if a message returned is not a result it is put in the\r
- * request queue. The resultsbuf has to be large enough\r
- * for the largest result\r
- */\r
-gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result)\r
-{\r
- struct hostcall * h = (struct hostcall *) msg;\r
- gs_int8_t buf[MAXMSGSZ];\r
- FTAID from;\r
- gs_int32_t length;\r
- gs_int32_t lowop;\r
-#ifdef PRINTMSG\r
- fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "\r
- "type %u with length %u\n",\r
- (f.ip>>24)&0xff,\r
- (f.ip>>16)&0xff,\r
- (f.ip>>8)&0xff,\r
- (f.ip)&0xff,\r
- f.port,h->callid,h->size);\r
-#endif\r
- if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {\r
- gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
- return -1;\r
- }\r
- h=(struct hostcall *) buf;\r
- while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {\r
-#ifdef PRINTMSG\r
- fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"\r
- " of type %u with length %u\n",\r
- (from.ip>>24)&0xff,\r
- (from.ip>>16)&0xff,\r
- (from.ip>>8)&0xff,\r
- (from.ip)&0xff,\r
- from.port,\r
- h->callid,h->size);\r
-#endif\r
- if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {\r
- h=(struct hostcall *) buf;\r
- if (h->callid > RESULT_OPCODE_BASE) {\r
- memcpy(result,buf,length);\r
- return 0;\r
- }\r
- if (sidequeue_append(from,buf,length)<0) {\r
- gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
- return -1;\r
- }\r
- }\r
- }\r
- gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");\r
- return -1;\r
-}\r
-\r
-\r
-gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])\r
-{\r
- FILE * f;\r
- \r
- if (curprocess.active != 0 ) {\r
- return -1;\r
- }\r
- \r
- switch (type) {\r
- case CLEARINGHOUSE:\r
- if (gscpipc_init(1) < 0) {\r
- gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "\r
- "clearinghouse process\n");\r
- return -1;\r
- }\r
- break;\r
- case LFTA:\r
-#ifdef __linux__\r
- mlockall(MCL_CURRENT|MCL_FUTURE);\r
-#endif\r
- case APP:\r
- case HFTA:\r
- if (gscpipc_init(0) < 0) {\r
- gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "\r
- "non clearinghouse process\n");\r
- return -1;\r
- }\r
- break;\r
- default:\r
- gslog(LOG_EMERG,"ERROR:Unknown process type\n");\r
- return -1;\r
- }\r
- \r
- // if the buffersize is zero then allocating shared memory\r
- // will fail. So only use it for the clearinghouse and LFTAs\r
- if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {\r
- gslog(LOG_EMERG,\r
- "ERROR:buffersize in hostlib_init has to "\r
- "be at least %u Bytes long\n",\r
- 4*MAXTUPLESZ);\r
- return -1;\r
- }\r
- \r
- curprocess.type=type;\r
- curprocess.buffersize=buffersize;\r
- curprocess.active = 1;\r
- curprocess.deviceid=deviceid;\r
- curprocess.mapcnt=mapcnt;\r
- curprocess.map=map;\r
- return 0;\r
-}\r
-\r
-void hostlib_free()\r
-{\r
- if (curprocess.active != 1 ) {\r
- return;\r
- }\r
- curprocess.active = 0;\r
- gscpipc_free();\r
-}\r
-\r
-\r
-gs_retval_t fta_find(FTAname name, gs_uint32_t reuse, FTAID *ftaid,\r
- gs_sp_t schema, gs_int32_t schemasz)\r
-{\r
- gs_int8_t rb[MAXRES];\r
- struct fta_find_arg a;\r
- struct ftafind_result * sr = (struct ftafind_result *)rb;\r
- \r
- a.h.callid = FTA_LOOKUP;\r
- a.h.size = sizeof(struct fta_find_arg);\r
- a.reuse=reuse;\r
- if (strlen(name)>=(MAXFTANAME-1)) {\r
- gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
- return -1;\r
- }\r
- strcpy(a.name,name);\r
- ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);\r
- if (sr->h.callid != FTAFIND_RESULT) {\r
- gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");\r
- return -1;\r
- }\r
- if (sr->result >= 0) {\r
- if (schema !=0) {\r
- if (strlen(sr->schema) >= schemasz) {\r
- gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");\r
- return -1;\r
- } else {\r
- strcpy(schema,sr->schema);\r
- }\r
- }\r
- *ftaid=sr->f;\r
- }\r
- return sr->result;\r
-}\r
-\r
-gs_retval_t fta_alloc_instance(FTAID subscriber,\r
- FTAID * ftaid, FTAname name, gs_sp_t schema,\r
- gs_uint32_t reusable,\r
- gs_int32_t command, gs_int32_t sz, void * data)\r
-{\r
- gs_int8_t rb[MAXRES];\r
- struct fta_alloc_instance_arg * a;\r
- struct fta_result * fr = (struct fta_result *)rb;\r
- struct ringbuf *r;\r
- \r
- /* make sure we have the share memory required */\r
- if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {\r
- gslog(LOG_EMERG,"ERROR:could not allocate shared memory"\r
- "for FTA %s\n",name);\r
- return -1;\r
- }\r
- \r
- if (strlen(name)>=(MAXFTANAME-1)) {\r
- gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
- return -1;\r
- }\r
- \r
- if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
- gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
- return -1;\r
- }\r
- \r
- a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);\r
- \r
- a->h.callid = FTA_ALLOC_INSTANCE;\r
- a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;\r
- a->f=*ftaid;\r
- a->subscriber=subscriber;\r
- a->reusable=reusable;\r
- a->command = command;\r
- a->sz = sz;\r
- memcpy(&a->data[0],data,sz);\r
- strcpy(a->name,name);\r
- strcpy(a->schema,schema);\r
- \r
- ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);\r
- \r
- if (fr->h.callid != FTA_RESULT) {\r
- gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
- return -1;\r
- }\r
- \r
- *ftaid=fr->f;\r
- \r
- if (fr->result==0) {\r
- gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);\r
- return streamregistry_add(*ftaid,r);\r
- }\r
- \r
- return fr->result;\r
-}\r
-\r
-gs_retval_t fta_alloc_print_instance(FTAID subscriber,\r
- FTAID * ftaid,\r
- FTAname name, gs_sp_t schema, gs_uint32_t reusable,\r
- gs_int32_t command, gs_int32_t sz, void * data,\r
- gs_sp_t path,gs_sp_t basename,\r
- gs_sp_t temporal_field, gs_sp_t split_field,\r
- gs_uint32_t delta, gs_uint32_t split)\r
-{\r
- gs_int8_t rb[MAXRES];\r
- struct fta_alloc_instance_arg * a;\r
- struct fta_result * fr = (struct fta_result *)rb;\r
- \r
- if ((strlen(path)>=MAXPRINTSTRING-1)\r
- || (strlen(basename)>=MAXPRINTSTRING-1)\r
- || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {\r
- gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"\r
- " arguments to long\n");\r
- return -1;\r
- }\r
- if (strlen(name)>=(MAXFTANAME-1)) {\r
- gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
- return -1;\r
- }\r
- \r
- if (strlen(schema)>=(MAXSCHEMASZ-1)) {\r
- gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);\r
- return -1;\r
- }\r
- \r
- a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);\r
- \r
- a->h.callid = FTA_ALLOC_PRINT_INSTANCE;\r
- a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;\r
- a->f=*ftaid;\r
- a->subscriber=subscriber;\r
- a->reusable=reusable;\r
- a->split=split;\r
- strcpy(a->name,name);\r
- strcpy(a->schema,schema);\r
- a->command = command;\r
- a->sz = sz;\r
- strcpy(a->path,path);\r
- strcpy(a->basename,basename);\r
- strcpy(a->temporal_field,temporal_field);\r
- strcpy(a->split_field,split_field);\r
- a->delta=delta;\r
- memcpy(&a->data[0],data,sz);\r
- \r
- ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);\r
- \r
- if (fr->h.callid != FTA_RESULT) {\r
- gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
- return -1;\r
- }\r
- \r
- *ftaid=fr->f;\r
- \r
- return fr->result;\r
-}\r
-\r
-gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t recursive)\r
-{\r
- gs_int8_t rb[MAXRES];\r
- struct fta_free_instance_arg a;\r
- struct standard_result * sr = (struct standard_result *)rb;\r
- struct ringbuf *r;\r
- \r
- a.h.callid = FTA_FREE_INSTANCE;\r
- a.h.size = sizeof(struct fta_free_instance_arg);\r
- a.subscriber=subscriber;\r
- a.f=ftaid;\r
- a.recursive=recursive;\r
- ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);\r
- if (sr->h.callid != STANDARD_RESULT) {\r
- gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
- return -1;\r
- }\r
- \r
- /* make sure we remove the mapping*/\r
- streamregistry_remove(ftaid);\r
- \r
- return sr->result;\r
-}\r
-\r
-gs_retval_t fta_control(FTAID subscriber,\r
- FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)\r
-{\r
- gs_int8_t rb[MAXRES];\r
- struct fta_control_arg * a;\r
- struct standard_result * sr = (struct standard_result *)rb;\r
- \r
- a = alloca(sizeof(struct fta_control_arg) + sz);\r
- \r
- a->h.callid = FTA_CONTROL;\r
- a->h.size = sizeof(struct fta_control_arg)+ sz;\r
- a->subscriber=subscriber;\r
- a->f=ftaid;\r
- a->command = command;\r
- a->sz = sz;\r
- memcpy(&a->data[0],value,sz);\r
- \r
- ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);\r
- \r
- if (sr->h.callid != STANDARD_RESULT) {\r
- gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
- return -1;\r
- }\r
- \r
- return sr->result;\r
-}\r
-\r
-gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,\r
- gs_uint32_t sz, fta_stat * trace){\r
-#ifdef CLEARINGHOUSE_HEARTBEAT\r
- struct fta_heartbeat_arg * a;\r
- a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));\r
- a->h.callid = FTA_HEARTBEAT;\r
- a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));\r
- a->sender=self;\r
- a->trace_id=trace_id;\r
- a->sz=sz;\r
- if (sz!=0) {\r
- memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));\r
- }\r
-#ifdef PRINTMSG\r
- fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "\r
- "type %u with length %u\n",\r
- (clearinghouseftaid.ip>>24)&0xff,\r
- (clearinghouseftaid.ip>>16)&0xff,\r
- (clearinghouseftaid.ip>>8)&0xff,\r
- (clearinghouseftaid.ip)&0xff,\r
- clearinghouseftaid.port,a->h.callid,a->h.size);\r
-#endif\r
- if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {\r
- gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
- return -1;\r
- }\r
-#endif\r
- return 0;\r
-}\r
-\r
-gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){\r
- struct fta_notify_producer_failure_arg a;\r
- a.h.callid = FTA_PRODUCER_FAILURE;\r
- a.h.size = sizeof(struct fta_notify_producer_failure_arg);\r
- a.sender=self;\r
- a.producer=producer;\r
-#ifdef PRINTMSG\r
- fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "\r
- "type %u with length %u\n",\r
- (clearinghouseftaid.ip>>24)&0xff,\r
- (clearinghouseftaid.ip>>16)&0xff,\r
- (clearinghouseftaid.ip>>8)&0xff,\r
- (clearinghouseftaid.ip)&0xff,\r
- clearinghouseftaid.port,a.h.callid,a.h.size);\r
-#endif\r
- if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {\r
- gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
- return -1;\r
- }\r
- return 0;\r
-}\r
-\r
-gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)\r
-{\r
- gs_int8_t rb[MAXRES];\r
- struct process_control_arg * a;\r
- struct standard_result * sr = (struct standard_result *)rb;\r
- \r
- \r
- a = alloca(sizeof(struct process_control_arg) + sz);\r
- \r
- a->h.callid = PROCESS_CONTROL;\r
- a->h.size = sizeof(struct process_control_arg)+ sz;\r
- a->command = command;\r
- a->sz = sz;\r
- memcpy(&a->data[0],value,sz);\r
- \r
- ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);\r
- \r
- if (sr->h.callid != STANDARD_RESULT) {\r
- gslog(LOG_EMERG,"ERROR:Wrong result code received\n");\r
- return -1;\r
- }\r
- \r
- return sr->result;\r
-}\r
-\r
-\r
-static void timeouthandler ()\r
-{\r
- struct timeout_result a;\r
- \r
- a.h.callid=TIMEOUT;\r
- a.h.size=sizeof(struct timeout_result);\r
- if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {\r
- gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");\r
- }\r
-}\r
-\r
-gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,\r
- gs_int32_t tbuf_size, gs_int32_t timeout)\r
-{\r
- struct ringbuf * r;\r
- FTAID from;\r
- gs_int32_t length;\r
- gs_int8_t buf[MAXMSGSZ];\r
- gs_int32_t lopp;\r
- FTAID * f;\r
- static gs_uint64_t s1=0;\r
- static gs_uint64_t s2;\r
- if (s1==0) {\r
- s1=rdtsc();\r
- }\r
- s2=rdtsc();\r
- cycles+=(s2-s1);\r
-start:\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"CHECK RINGBUFS\n");\r
-#endif\r
-#ifndef POLLING\r
- /* use chance to cleanout message queue no reason\r
- to keep anything else */\r
- while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
-#endif\r
- \r
- streamregistry_getactiveringbuf_reset();\r
- while ((r=streamregistry_getactiveringbuf())>0) {\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"\r
- "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
- r->length);\r
- if (UNREAD(r)) {\r
- fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,\r
- (CURREAD(r)->f.ip>>24)&0xff,\r
- (CURREAD(r)->f.ip>>16)&0xff,\r
- (CURREAD(r)->f.ip>>8)&0xff,\r
- (CURREAD(r)->f.ip)&0xff,\r
- CURREAD(r)->f.port,\r
- CURREAD(r)->f.streamid,\r
- CURREAD(r)->sz);\r
- }\r
- \r
-#endif\r
- if (UNREAD(r)) {\r
- *ftaid=(CURREAD(r)->f);\r
- *size=CURREAD(r)->sz;\r
- memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);\r
- intuple++;\r
- inbytes+=CURREAD(r)->sz;\r
- ADVANCEREAD(r);\r
- s1=rdtsc();\r
- return 0;\r
- }\r
- }\r
- if (timeout == -1) {\r
- *size=0;\r
- s1=rdtsc();\r
- return 1;\r
- }\r
- if (timeout !=0) {\r
- signal(SIGALRM, timeouthandler);\r
- alarm(timeout);\r
- }\r
- \r
-#ifndef POLLING\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"START BLOCKCALLS\n");\r
-#endif\r
- streamregistry_getactiveftaid_reset();\r
- while ((f=streamregistry_getactiveftaid())!=0) {\r
- struct gscp_get_buffer_arg a;\r
- a.h.callid = GSCP_GET_BUFFER;\r
- a.h.size = sizeof(struct gscp_get_buffer_arg);\r
- a.timeout = timeout;\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Waiting for %u.%u.%u.%u:%u\n",\r
- (f->ip>>24)&0xff,\r
- (f->ip>>16)&0xff,\r
- (f->ip>>8)&0xff,\r
- (f->ip)&0xff,\r
- f->port\r
- );\r
-#endif\r
- if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {\r
- s1=rdtsc();\r
- return -1;\r
- }\r
- }\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"BLOCK\n");\r
-#endif\r
- while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {\r
-#else // If we poll we return after 100 msec\r
- sleepagain:\r
- while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {\r
-#endif\r
- struct standard_result * sr = (struct standard_result *) buf;\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Got return code %u\n",sr->h.callid);\r
-#endif\r
- if (lopp==FTACALLBACK) {\r
- if (timeout != 0) {\r
- signal(SIGALRM, SIG_IGN);\r
- }\r
- if (sr->h.callid == WAKEUP) {\r
- /* use chance to cleanout message queue no reason\r
- to keep anything else */\r
- while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
- goto start;\r
- }\r
- if (sr->h.callid == TIMEOUT) {\r
- /* use chance to cleanout message queue no reason\r
- to keep anything else */\r
- while (gscpipc_read(&from,&lopp,buf,&length,0)>0);\r
- *size=0;\r
- s1=rdtsc();\r
- return 1;\r
- }\r
- if (sidequeue_append(from,buf,length)<0) {\r
- gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");\r
- s1=rdtsc();\r
- return -1;\r
- }\r
- }\r
- }\r
-#ifdef POLLING\r
- streamregistry_getactiveringbuf_reset();\r
- while ((r=streamregistry_getactiveringbuf())>0) {\r
-#ifdef PRINTMSG\r
- fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"\r
- "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,\r
- r->length);\r
- if (UNREAD(r)) {\r
- fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,\r
- (CURREAD(r)->f.ip>>24)&0xff,\r
- (CURREAD(r)->f.ip>>16)&0xff,\r
- (CURREAD(r)->f.ip>>8)&0xff,\r
- (CURREAD(r)->f.ip)&0xff,\r
- CURREAD(r)->f.port,\r
- CURREAD(r)->f.streamid,\r
- CURREAD(r)->sz);\r
- }\r
- \r
-#endif\r
- if (UNREAD(r)) {\r
- *ftaid=(CURREAD(r)->f);\r
- *size=CURREAD(r)->sz;\r
- memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);\r
- intuple++;\r
- inbytes+=CURREAD(r)->sz;\r
- ADVANCEREAD(r);\r
- if (timeout != 0) {\r
- signal(SIGALRM, SIG_IGN);\r
- }\r
- s1=rdtsc();\r
- return 0;\r
- }\r
- }\r
- goto sleepagain; // Try again\r
-#endif\r
- gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");\r
- /* we should never get here */\r
- s1=rdtsc();\r
- return -1;\r
- }\r
- \r
- \r
- \r
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+#include <gscpipc.h>
+#include <ipcencoding.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <lapp.h>
+#include <lappregistries.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <sys/mman.h>
+#include "rdtsc.h"
+// If POLLING is defined applications poll every 100 msec instead of blocking
+#define POLLING
+
+struct processtate curprocess = {0,0,0,255,0};
+struct FTAID clearinghouseftaid = {0,0,0,0};
+
+/*
+ * sends the message passed in buf and waits for a result
+ * if a message returned is not a result it is put in the
+ * request queue. The resultsbuf has to be large enough
+ * for the largest result
+ */
+gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result)
+{
+ struct hostcall * h = (struct hostcall *) msg;
+ gs_int8_t buf[MAXMSGSZ];
+ FTAID from;
+ gs_int32_t length;
+ gs_int32_t lowop;
+#ifdef PRINTMSG
+ fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "
+ "type %u with length %u\n",
+ (f.ip>>24)&0xff,
+ (f.ip>>16)&0xff,
+ (f.ip>>8)&0xff,
+ (f.ip)&0xff,
+ f.port,h->callid,h->size);
+#endif
+ if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {
+ gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+ return -1;
+ }
+ h=(struct hostcall *) buf;
+ while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {
+#ifdef PRINTMSG
+ fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"
+ " of type %u with length %u\n",
+ (from.ip>>24)&0xff,
+ (from.ip>>16)&0xff,
+ (from.ip>>8)&0xff,
+ (from.ip)&0xff,
+ from.port,
+ h->callid,h->size);
+#endif
+ if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
+ h=(struct hostcall *) buf;
+ if (h->callid > RESULT_OPCODE_BASE) {
+ memcpy(result,buf,length);
+ return 0;
+ }
+ if (sidequeue_append(from,buf,length)<0) {
+ gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
+ return -1;
+ }
+ }
+ }
+ gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");
+ return -1;
+}
+
+
+gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])
+{
+ FILE * f;
+
+ if (curprocess.active != 0 ) {
+ return -1;
+ }
+
+ switch (type) {
+ case CLEARINGHOUSE:
+ if (gscpipc_init(1) < 0) {
+ gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
+ "clearinghouse process\n");
+ return -1;
+ }
+ break;
+ case LFTA:
+#ifdef __linux__
+ mlockall(MCL_CURRENT|MCL_FUTURE);
+#endif
+ case APP:
+ case HFTA:
+ if (gscpipc_init(0) < 0) {
+ gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
+ "non clearinghouse process\n");
+ return -1;
+ }
+ break;
+ default:
+ gslog(LOG_EMERG,"ERROR:Unknown process type\n");
+ return -1;
+ }
+
+ // if the buffersize is zero then allocating shared memory
+ // will fail. So only use it for the clearinghouse and LFTAs
+ if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {
+ gslog(LOG_EMERG,
+ "ERROR:buffersize in hostlib_init has to "
+ "be at least %u Bytes long\n",
+ 4*MAXTUPLESZ);
+ return -1;
+ }
+
+ curprocess.type=type;
+ curprocess.buffersize=buffersize;
+ curprocess.active = 1;
+ curprocess.deviceid=deviceid;
+ curprocess.mapcnt=mapcnt;
+ curprocess.map=map;
+ return 0;
+}
+
+void hostlib_free()
+{
+ if (curprocess.active != 1 ) {
+ return;
+ }
+ curprocess.active = 0;
+ gscpipc_free();
+}
+
+
+gs_retval_t fta_find(FTAname name, gs_uint32_t reuse, FTAID *ftaid,
+ gs_sp_t schema, gs_int32_t schemasz)
+{
+ gs_int8_t rb[MAXRES];
+ struct fta_find_arg a;
+ struct ftafind_result * sr = (struct ftafind_result *)rb;
+
+ a.h.callid = FTA_LOOKUP;
+ a.h.size = sizeof(struct fta_find_arg);
+ a.reuse=reuse;
+ if (strlen(name)>=(MAXFTANAME-1)) {
+ gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+ return -1;
+ }
+ strcpy(a.name,name);
+ ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
+ if (sr->h.callid != FTAFIND_RESULT) {
+ gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");
+ return -1;
+ }
+ if (sr->result >= 0) {
+ if (schema !=0) {
+ if (strlen(sr->schema) >= schemasz) {
+ gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");
+ return -1;
+ } else {
+ strcpy(schema,sr->schema);
+ }
+ }
+ *ftaid=sr->f;
+ }
+ return sr->result;
+}
+
+gs_retval_t fta_alloc_instance(FTAID subscriber,
+ FTAID * ftaid, FTAname name, gs_sp_t schema,
+ gs_uint32_t reusable,
+ gs_int32_t command, gs_int32_t sz, void * data)
+{
+ gs_int8_t rb[MAXRES];
+ struct fta_alloc_instance_arg * a;
+ struct fta_result * fr = (struct fta_result *)rb;
+ struct ringbuf *r;
+
+ /* make sure we have the share memory required */
+ if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {
+ gslog(LOG_EMERG,"ERROR:could not allocate shared memory"
+ "for FTA %s\n",name);
+ return -1;
+ }
+
+ if (strlen(name)>=(MAXFTANAME-1)) {
+ gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+ return -1;
+ }
+
+ if (strlen(schema)>=(MAXSCHEMASZ-1)) {
+ gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+ return -1;
+ }
+
+ a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
+
+ a->h.callid = FTA_ALLOC_INSTANCE;
+ a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
+ a->f=*ftaid;
+ a->subscriber=subscriber;
+ a->reusable=reusable;
+ a->command = command;
+ a->sz = sz;
+ memcpy(&a->data[0],data,sz);
+ strcpy(a->name,name);
+ strcpy(a->schema,schema);
+
+ ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
+
+ if (fr->h.callid != FTA_RESULT) {
+ gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+ return -1;
+ }
+
+ *ftaid=fr->f;
+
+ if (fr->result==0) {
+ gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
+ return streamregistry_add(*ftaid,r);
+ }
+
+ return fr->result;
+}
+
+gs_retval_t fta_alloc_print_instance(FTAID subscriber,
+ FTAID * ftaid,
+ FTAname name, gs_sp_t schema, gs_uint32_t reusable,
+ gs_int32_t command, gs_int32_t sz, void * data,
+ gs_sp_t path,gs_sp_t basename,
+ gs_sp_t temporal_field, gs_sp_t split_field,
+ gs_uint32_t delta, gs_uint32_t split)
+{
+ gs_int8_t rb[MAXRES];
+ struct fta_alloc_instance_arg * a;
+ struct fta_result * fr = (struct fta_result *)rb;
+
+ if ((strlen(path)>=MAXPRINTSTRING-1)
+ || (strlen(basename)>=MAXPRINTSTRING-1)
+ || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {
+ gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"
+ " arguments to long\n");
+ return -1;
+ }
+ if (strlen(name)>=(MAXFTANAME-1)) {
+ gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+ return -1;
+ }
+
+ if (strlen(schema)>=(MAXSCHEMASZ-1)) {
+ gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
+ return -1;
+ }
+
+ a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
+
+ a->h.callid = FTA_ALLOC_PRINT_INSTANCE;
+ a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
+ a->f=*ftaid;
+ a->subscriber=subscriber;
+ a->reusable=reusable;
+ a->split=split;
+ strcpy(a->name,name);
+ strcpy(a->schema,schema);
+ a->command = command;
+ a->sz = sz;
+ strcpy(a->path,path);
+ strcpy(a->basename,basename);
+ strcpy(a->temporal_field,temporal_field);
+ strcpy(a->split_field,split_field);
+ a->delta=delta;
+ memcpy(&a->data[0],data,sz);
+
+ ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
+
+ if (fr->h.callid != FTA_RESULT) {
+ gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+ return -1;
+ }
+
+ *ftaid=fr->f;
+
+ return fr->result;
+}
+
+gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t recursive)
+{
+ gs_int8_t rb[MAXRES];
+ struct fta_free_instance_arg a;
+ struct standard_result * sr = (struct standard_result *)rb;
+ struct ringbuf *r;
+
+ a.h.callid = FTA_FREE_INSTANCE;
+ a.h.size = sizeof(struct fta_free_instance_arg);
+ a.subscriber=subscriber;
+ a.f=ftaid;
+ a.recursive=recursive;
+ ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);
+ if (sr->h.callid != STANDARD_RESULT) {
+ gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+ return -1;
+ }
+
+ /* make sure we remove the mapping*/
+ streamregistry_remove(ftaid);
+
+ return sr->result;
+}
+
+gs_retval_t fta_control(FTAID subscriber,
+ FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
+{
+ gs_int8_t rb[MAXRES];
+ struct fta_control_arg * a;
+ struct standard_result * sr = (struct standard_result *)rb;
+
+ a = alloca(sizeof(struct fta_control_arg) + sz);
+
+ a->h.callid = FTA_CONTROL;
+ a->h.size = sizeof(struct fta_control_arg)+ sz;
+ a->subscriber=subscriber;
+ a->f=ftaid;
+ a->command = command;
+ a->sz = sz;
+ memcpy(&a->data[0],value,sz);
+
+ ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
+
+ if (sr->h.callid != STANDARD_RESULT) {
+ gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+ return -1;
+ }
+
+ return sr->result;
+}
+
+gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,
+ gs_uint32_t sz, fta_stat * trace){
+#ifdef CLEARINGHOUSE_HEARTBEAT
+ struct fta_heartbeat_arg * a;
+ a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));
+ a->h.callid = FTA_HEARTBEAT;
+ a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));
+ a->sender=self;
+ a->trace_id=trace_id;
+ a->sz=sz;
+ if (sz!=0) {
+ memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));
+ }
+#ifdef PRINTMSG
+ fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "
+ "type %u with length %u\n",
+ (clearinghouseftaid.ip>>24)&0xff,
+ (clearinghouseftaid.ip>>16)&0xff,
+ (clearinghouseftaid.ip>>8)&0xff,
+ (clearinghouseftaid.ip)&0xff,
+ clearinghouseftaid.port,a->h.callid,a->h.size);
+#endif
+ if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {
+ gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+ return -1;
+ }
+#endif
+ return 0;
+}
+
+gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){
+ struct fta_notify_producer_failure_arg a;
+ a.h.callid = FTA_PRODUCER_FAILURE;
+ a.h.size = sizeof(struct fta_notify_producer_failure_arg);
+ a.sender=self;
+ a.producer=producer;
+#ifdef PRINTMSG
+ fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "
+ "type %u with length %u\n",
+ (clearinghouseftaid.ip>>24)&0xff,
+ (clearinghouseftaid.ip>>16)&0xff,
+ (clearinghouseftaid.ip>>8)&0xff,
+ (clearinghouseftaid.ip)&0xff,
+ clearinghouseftaid.port,a.h.callid,a.h.size);
+#endif
+ if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {
+ gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+ return -1;
+ }
+ return 0;
+}
+
+gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
+{
+ gs_int8_t rb[MAXRES];
+ struct process_control_arg * a;
+ struct standard_result * sr = (struct standard_result *)rb;
+
+
+ a = alloca(sizeof(struct process_control_arg) + sz);
+
+ a->h.callid = PROCESS_CONTROL;
+ a->h.size = sizeof(struct process_control_arg)+ sz;
+ a->command = command;
+ a->sz = sz;
+ memcpy(&a->data[0],value,sz);
+
+ ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
+
+ if (sr->h.callid != STANDARD_RESULT) {
+ gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
+ return -1;
+ }
+
+ return sr->result;
+}
+
+
+static void timeouthandler ()
+{
+ struct timeout_result a;
+
+ a.h.callid=TIMEOUT;
+ a.h.size=sizeof(struct timeout_result);
+ if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {
+ gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
+ }
+}
+
+gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,
+ gs_int32_t tbuf_size, gs_int32_t timeout)
+{
+ struct ringbuf * r;
+ FTAID from;
+ gs_int32_t length;
+ gs_int8_t buf[MAXMSGSZ];
+ gs_int32_t lopp;
+ FTAID * f;
+ static gs_uint64_t s1=0;
+ static gs_uint64_t s2;
+ if (s1==0) {
+ s1=rdtsc();
+ }
+ s2=rdtsc();
+ cycles+=(s2-s1);
+start:
+#ifdef PRINTMSG
+ fprintf(stderr,"CHECK RINGBUFS\n");
+#endif
+#ifndef POLLING
+ /* use chance to cleanout message queue no reason
+ to keep anything else */
+ while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
+#endif
+
+ streamregistry_getactiveringbuf_reset();
+ while ((r=streamregistry_getactiveringbuf())>0) {
+#ifdef PRINTMSG
+ fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
+ "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
+ r->length);
+ if (UNREAD(r)) {
+ fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
+ (CURREAD(r)->f.ip>>24)&0xff,
+ (CURREAD(r)->f.ip>>16)&0xff,
+ (CURREAD(r)->f.ip>>8)&0xff,
+ (CURREAD(r)->f.ip)&0xff,
+ CURREAD(r)->f.port,
+ CURREAD(r)->f.streamid,
+ CURREAD(r)->sz);
+ }
+
+#endif
+ if (UNREAD(r)) {
+ *ftaid=(CURREAD(r)->f);
+ *size=CURREAD(r)->sz;
+ memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
+ intuple++;
+ inbytes+=CURREAD(r)->sz;
+ ADVANCEREAD(r);
+ s1=rdtsc();
+ return 0;
+ }
+ }
+ if (timeout == -1) {
+ *size=0;
+ s1=rdtsc();
+ return 1;
+ }
+ if (timeout !=0) {
+ signal(SIGALRM, timeouthandler);
+ alarm(timeout);
+ }
+
+#ifndef POLLING
+#ifdef PRINTMSG
+ fprintf(stderr,"START BLOCKCALLS\n");
+#endif
+ streamregistry_getactiveftaid_reset();
+ while ((f=streamregistry_getactiveftaid())!=0) {
+ struct gscp_get_buffer_arg a;
+ a.h.callid = GSCP_GET_BUFFER;
+ a.h.size = sizeof(struct gscp_get_buffer_arg);
+ a.timeout = timeout;
+#ifdef PRINTMSG
+ fprintf(stderr,"Waiting for %u.%u.%u.%u:%u\n",
+ (f->ip>>24)&0xff,
+ (f->ip>>16)&0xff,
+ (f->ip>>8)&0xff,
+ (f->ip)&0xff,
+ f->port
+ );
+#endif
+ if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
+ s1=rdtsc();
+ return -1;
+ }
+ }
+#ifdef PRINTMSG
+ fprintf(stderr,"BLOCK\n");
+#endif
+ while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {
+#else // If we poll we return after 100 msec
+ sleepagain:
+ while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {
+#endif
+ struct standard_result * sr = (struct standard_result *) buf;
+#ifdef PRINTMSG
+ fprintf(stderr,"Got return code %u\n",sr->h.callid);
+#endif
+ if (lopp==FTACALLBACK) {
+ if (timeout != 0) {
+ signal(SIGALRM, SIG_IGN);
+ }
+ if (sr->h.callid == WAKEUP) {
+ /* use chance to cleanout message queue no reason
+ to keep anything else */
+ while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
+ goto start;
+ }
+ if (sr->h.callid == TIMEOUT) {
+ /* use chance to cleanout message queue no reason
+ to keep anything else */
+ while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
+ *size=0;
+ s1=rdtsc();
+ return 1;
+ }
+ if (sidequeue_append(from,buf,length)<0) {
+ gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
+ s1=rdtsc();
+ return -1;
+ }
+ }
+ }
+#ifdef POLLING
+ streamregistry_getactiveringbuf_reset();
+ while ((r=streamregistry_getactiveringbuf())>0) {
+#ifdef PRINTMSG
+ fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
+ "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
+ r->length);
+ if (UNREAD(r)) {
+ fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
+ (CURREAD(r)->f.ip>>24)&0xff,
+ (CURREAD(r)->f.ip>>16)&0xff,
+ (CURREAD(r)->f.ip>>8)&0xff,
+ (CURREAD(r)->f.ip)&0xff,
+ CURREAD(r)->f.port,
+ CURREAD(r)->f.streamid,
+ CURREAD(r)->sz);
+ }
+
+#endif
+ if (UNREAD(r)) {
+ *ftaid=(CURREAD(r)->f);
+ *size=CURREAD(r)->sz;
+ memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
+ intuple++;
+ inbytes+=CURREAD(r)->sz;
+ ADVANCEREAD(r);
+ if (timeout != 0) {
+ signal(SIGALRM, SIG_IGN);
+ }
+ s1=rdtsc();
+ return 0;
+ }
+ }
+ goto sleepagain; // Try again
+#endif
+ gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");
+ /* we should never get here */
+ s1=rdtsc();
+ return -1;
+ }
+
+
+