1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
16 #include <ipcencoding.h>
20 #include <lappregistries.h>
26 // If POLLING is defined applications poll every 100 msec instead of blocking
29 struct processtate curprocess = {0,0,0,255,0};
30 struct FTAID clearinghouseftaid = {0,0,0,0};
33 * sends the message passed in buf and waits for a result
34 * if a message returned is not a result it is put in the
35 * request queue. The resultsbuf has to be large enough
36 * for the largest result
38 gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result)
40 struct hostcall * h = (struct hostcall *) msg;
41 gs_int8_t buf[MAXMSGSZ];
46 fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of "
47 "type %u with length %u\n",
52 f.port,h->callid,h->size);
54 if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) {
55 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
58 h=(struct hostcall *) buf;
59 while (gscpipc_read(&from,&lowop,buf,&length,1)>0) {
61 fprintf(stderr, "HOST response from %u.%u.%u.%u:%u"
62 " of type %u with length %u\n",
70 if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) {
71 h=(struct hostcall *) buf;
72 if (h->callid > RESULT_OPCODE_BASE) {
73 memcpy(result,buf,length);
76 if (sidequeue_append(from,buf,length)<0) {
77 gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
82 gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n");
87 gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[])
91 if (curprocess.active != 0 ) {
97 if (gscpipc_init(1) < 0) {
98 gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
99 "clearinghouse process\n");
105 mlockall(MCL_CURRENT|MCL_FUTURE);
109 if (gscpipc_init(0) < 0) {
110 gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for "
111 "non clearinghouse process\n");
116 gslog(LOG_EMERG,"ERROR:Unknown process type\n");
120 // if the buffersize is zero then allocating shared memory
121 // will fail. So only use it for the clearinghouse and LFTAs
122 if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) {
124 "ERROR:buffersize in hostlib_init has to "
125 "be at least %u Bytes long\n",
130 curprocess.type=type;
131 curprocess.buffersize=buffersize;
132 curprocess.active = 1;
133 curprocess.deviceid=deviceid;
134 curprocess.mapcnt=mapcnt;
141 if (curprocess.active != 1 ) {
144 curprocess.active = 0;
149 gs_retval_t fta_find(FTAname name, gs_uint32_t reuse, FTAID *ftaid,
150 gs_sp_t schema, gs_int32_t schemasz)
152 gs_int8_t rb[MAXRES];
153 struct fta_find_arg a;
154 struct ftafind_result * sr = (struct ftafind_result *)rb;
156 a.h.callid = FTA_LOOKUP;
157 a.h.size = sizeof(struct fta_find_arg);
159 if (strlen(name)>=(MAXFTANAME-1)) {
160 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
164 ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb);
165 if (sr->h.callid != FTAFIND_RESULT) {
166 gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n");
169 if (sr->result >= 0) {
171 if (strlen(sr->schema) >= schemasz) {
172 gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n");
175 strcpy(schema,sr->schema);
183 gs_retval_t fta_alloc_instance(FTAID subscriber,
184 FTAID * ftaid, FTAname name, gs_sp_t schema,
185 gs_uint32_t reusable,
186 gs_int32_t command, gs_int32_t sz, void * data)
188 gs_int8_t rb[MAXRES];
189 struct fta_alloc_instance_arg * a;
190 struct fta_result * fr = (struct fta_result *)rb;
193 /* make sure we have the share memory required */
194 if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) {
195 gslog(LOG_EMERG,"ERROR:could not allocate shared memory"
196 "for FTA %s\n",name);
200 if (strlen(name)>=(MAXFTANAME-1)) {
201 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
205 if (strlen(schema)>=(MAXSCHEMASZ-1)) {
206 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
210 a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
212 a->h.callid = FTA_ALLOC_INSTANCE;
213 a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
215 a->subscriber=subscriber;
216 a->reusable=reusable;
217 a->command = command;
219 memcpy(&a->data[0],data,sz);
220 strcpy(a->name,name);
221 strcpy(a->schema,schema);
223 ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
225 if (fr->h.callid != FTA_RESULT) {
226 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
233 gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
234 return streamregistry_add(*ftaid,r);
240 gs_retval_t fta_alloc_print_instance(FTAID subscriber,
242 FTAname name, gs_sp_t schema, gs_uint32_t reusable,
243 gs_int32_t command, gs_int32_t sz, void * data,
244 gs_sp_t path,gs_sp_t basename,
245 gs_sp_t temporal_field, gs_sp_t split_field,
246 gs_uint32_t delta, gs_uint32_t split)
248 gs_int8_t rb[MAXRES];
249 struct fta_alloc_instance_arg * a;
250 struct fta_result * fr = (struct fta_result *)rb;
252 if ((strlen(path)>=MAXPRINTSTRING-1)
253 || (strlen(basename)>=MAXPRINTSTRING-1)
254 || (strlen(temporal_field)>=MAXPRINTSTRING-1)) {
255 gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string"
256 " arguments to long\n");
259 if (strlen(name)>=(MAXFTANAME-1)) {
260 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
264 if (strlen(schema)>=(MAXSCHEMASZ-1)) {
265 gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name);
269 a = alloca(sizeof(struct fta_alloc_instance_arg) + sz);
271 a->h.callid = FTA_ALLOC_PRINT_INSTANCE;
272 a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz;
274 a->subscriber=subscriber;
275 a->reusable=reusable;
277 strcpy(a->name,name);
278 strcpy(a->schema,schema);
279 a->command = command;
281 strcpy(a->path,path);
282 strcpy(a->basename,basename);
283 strcpy(a->temporal_field,temporal_field);
284 strcpy(a->split_field,split_field);
286 memcpy(&a->data[0],data,sz);
288 ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb);
290 if (fr->h.callid != FTA_RESULT) {
291 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
300 gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t recursive)
302 gs_int8_t rb[MAXRES];
303 struct fta_free_instance_arg a;
304 struct standard_result * sr = (struct standard_result *)rb;
307 a.h.callid = FTA_FREE_INSTANCE;
308 a.h.size = sizeof(struct fta_free_instance_arg);
309 a.subscriber=subscriber;
311 a.recursive=recursive;
312 ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb);
313 if (sr->h.callid != STANDARD_RESULT) {
314 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
318 /* make sure we remove the mapping*/
319 streamregistry_remove(ftaid);
324 gs_retval_t fta_control(FTAID subscriber,
325 FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
327 gs_int8_t rb[MAXRES];
328 struct fta_control_arg * a;
329 struct standard_result * sr = (struct standard_result *)rb;
331 a = alloca(sizeof(struct fta_control_arg) + sz);
333 a->h.callid = FTA_CONTROL;
334 a->h.size = sizeof(struct fta_control_arg)+ sz;
335 a->subscriber=subscriber;
337 a->command = command;
339 memcpy(&a->data[0],value,sz);
341 ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
343 if (sr->h.callid != STANDARD_RESULT) {
344 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
351 gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id,
352 gs_uint32_t sz, fta_stat * trace){
353 #ifdef CLEARINGHOUSE_HEARTBEAT
354 struct fta_heartbeat_arg * a;
355 a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat)));
356 a->h.callid = FTA_HEARTBEAT;
357 a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat));
359 a->trace_id=trace_id;
362 memcpy(&a->data[0],trace,(sz*sizeof(fta_stat)));
365 fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of "
366 "type %u with length %u\n",
367 (clearinghouseftaid.ip>>24)&0xff,
368 (clearinghouseftaid.ip>>16)&0xff,
369 (clearinghouseftaid.ip>>8)&0xff,
370 (clearinghouseftaid.ip)&0xff,
371 clearinghouseftaid.port,a->h.callid,a->h.size);
373 if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) {
374 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
381 gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){
382 struct fta_notify_producer_failure_arg a;
383 a.h.callid = FTA_PRODUCER_FAILURE;
384 a.h.size = sizeof(struct fta_notify_producer_failure_arg);
388 fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of "
389 "type %u with length %u\n",
390 (clearinghouseftaid.ip>>24)&0xff,
391 (clearinghouseftaid.ip>>16)&0xff,
392 (clearinghouseftaid.ip>>8)&0xff,
393 (clearinghouseftaid.ip)&0xff,
394 clearinghouseftaid.port,a.h.callid,a.h.size);
396 if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) {
397 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
403 gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value)
405 gs_int8_t rb[MAXRES];
406 struct process_control_arg * a;
407 struct standard_result * sr = (struct standard_result *)rb;
410 a = alloca(sizeof(struct process_control_arg) + sz);
412 a->h.callid = PROCESS_CONTROL;
413 a->h.size = sizeof(struct process_control_arg)+ sz;
414 a->command = command;
416 memcpy(&a->data[0],value,sz);
418 ipc_call_and_wait(ftaid,(gs_sp_t )a,rb);
420 if (sr->h.callid != STANDARD_RESULT) {
421 gslog(LOG_EMERG,"ERROR:Wrong result code received\n");
429 static void timeouthandler ()
431 struct timeout_result a;
434 a.h.size=sizeof(struct timeout_result);
435 if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) {
436 gslog(LOG_EMERG,"ERROR:Could not send on message queue\n");
440 gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer,
441 gs_int32_t tbuf_size, gs_int32_t timeout)
446 gs_int8_t buf[MAXMSGSZ];
449 static gs_uint64_t s1=0;
450 static gs_uint64_t s2;
458 fprintf(stderr,"CHECK RINGBUFS\n");
461 /* use chance to cleanout message queue no reason
462 to keep anything else */
463 while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
466 streamregistry_getactiveringbuf_reset();
467 while ((r=streamregistry_getactiveringbuf())>0) {
469 fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
470 "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
473 fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
474 (CURREAD(r)->f.ip>>24)&0xff,
475 (CURREAD(r)->f.ip>>16)&0xff,
476 (CURREAD(r)->f.ip>>8)&0xff,
477 (CURREAD(r)->f.ip)&0xff,
479 CURREAD(r)->f.streamid,
485 *ftaid=(CURREAD(r)->f);
486 *size=CURREAD(r)->sz;
487 memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
489 inbytes+=CURREAD(r)->sz;
501 signal(SIGALRM, timeouthandler);
507 fprintf(stderr,"START BLOCKCALLS\n");
509 streamregistry_getactiveftaid_reset();
510 while ((f=streamregistry_getactiveftaid())!=0) {
511 struct gscp_get_buffer_arg a;
512 a.h.callid = GSCP_GET_BUFFER;
513 a.h.size = sizeof(struct gscp_get_buffer_arg);
516 fprintf(stderr,"Waiting for %u.%u.%u.%u:%u\n",
524 if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) {
530 fprintf(stderr,"BLOCK\n");
532 while (gscpipc_read(&from,&lopp,buf,&length,1)>0) {
533 #else // If we poll we return after 100 msec
535 while (gscpipc_read(&from,&lopp,buf,&length,2)>0) {
537 struct standard_result * sr = (struct standard_result *) buf;
539 fprintf(stderr,"Got return code %u\n",sr->h.callid);
541 if (lopp==FTACALLBACK) {
543 signal(SIGALRM, SIG_IGN);
545 if (sr->h.callid == WAKEUP) {
546 /* use chance to cleanout message queue no reason
547 to keep anything else */
548 while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
551 if (sr->h.callid == TIMEOUT) {
552 /* use chance to cleanout message queue no reason
553 to keep anything else */
554 while (gscpipc_read(&from,&lopp,buf,&length,0)>0);
559 if (sidequeue_append(from,buf,length)<0) {
560 gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n");
567 streamregistry_getactiveringbuf_reset();
568 while ((r=streamregistry_getactiveringbuf())>0) {
570 fprintf(stderr,"Reading from ringpuffer %p [%p:%u]"
571 "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer,
574 fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next,
575 (CURREAD(r)->f.ip>>24)&0xff,
576 (CURREAD(r)->f.ip>>16)&0xff,
577 (CURREAD(r)->f.ip>>8)&0xff,
578 (CURREAD(r)->f.ip)&0xff,
580 CURREAD(r)->f.streamid,
586 *ftaid=(CURREAD(r)->f);
587 *size=CURREAD(r)->sz;
588 memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size);
590 inbytes+=CURREAD(r)->sz;
593 signal(SIGALRM, SIG_IGN);
599 goto sleepagain; // Try again
601 gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n");
602 /* we should never get here */