X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Flib%2Fgscphost%2Flappinterface.c;fp=src%2Flib%2Fgscphost%2Flappinterface.c;h=99196b8db03223d31c35f948d0553796ed32e47f;hb=c9783d8ea8b85d810483559e50dbf2297109e349;hp=0000000000000000000000000000000000000000;hpb=2f2369dfc58997659b3007b1cea68ad6bfc49a90;p=com%2Fgs-lite.git diff --git a/src/lib/gscphost/lappinterface.c b/src/lib/gscphost/lappinterface.c new file mode 100644 index 0000000..99196b8 --- /dev/null +++ b/src/lib/gscphost/lappinterface.c @@ -0,0 +1,608 @@ +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "rdtsc.h" +// If POLLING is defined applications poll every 100 msec instead of blocking +#define POLLING + +struct processtate curprocess = {0,0,0,255,0}; +struct FTAID clearinghouseftaid = {0,0,0,0}; + +/* + * sends the message passed in buf and waits for a result + * if a message returned is not a result it is put in the + * request queue. The resultsbuf has to be large enough + * for the largest result + */ +gs_retval_t ipc_call_and_wait(FTAID f, gs_sp_t msg, gs_sp_t result) +{ + struct hostcall * h = (struct hostcall *) msg; + gs_int8_t buf[MAXMSGSZ]; + FTAID from; + gs_int32_t length; + gs_int32_t lowop; +#ifdef PRINTMSG + fprintf(stderr, "HOST sending to %u.%u.%u.%u:%u of " + "type %u with length %u\n", + (f.ip>>24)&0xff, + (f.ip>>16)&0xff, + (f.ip>>8)&0xff, + (f.ip)&0xff, + f.port,h->callid,h->size); +#endif + if (gscpipc_send(f,FTACALLBACK,msg,h->size,1)<0) { + gslog(LOG_EMERG,"ERROR:Could not send on message queue\n"); + return -1; + } + h=(struct hostcall *) buf; + while (gscpipc_read(&from,&lowop,buf,&length,1)>0) { +#ifdef PRINTMSG + fprintf(stderr, "HOST response from %u.%u.%u.%u:%u" + " of type %u with length %u\n", + (from.ip>>24)&0xff, + (from.ip>>16)&0xff, + (from.ip>>8)&0xff, + (from.ip)&0xff, + from.port, + h->callid,h->size); +#endif + if ((lowop == FTACALLBACK) && (h->callid < RESULT_OPCODE_IGNORE)) { + h=(struct hostcall *) buf; + if (h->callid > RESULT_OPCODE_BASE) { + memcpy(result,buf,length); + return 0; + } + if (sidequeue_append(from,buf,length)<0) { + gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n"); + return -1; + } + } + } + gslog(LOG_EMERG, "ERROR::gscipc_read failed in ipc_call_and_wait\n"); + return -1; +} + + +gs_retval_t hostlib_init(gs_int32_t type, gs_int32_t buffersize, gs_int32_t deviceid, gs_int32_t mapcnt, gs_sp_t map[]) +{ + FILE * f; + + if (curprocess.active != 0 ) { + return -1; + } + + switch (type) { + case CLEARINGHOUSE: + if (gscpipc_init(1) < 0) { + gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for " + "clearinghouse process\n"); + return -1; + } + break; + case LFTA: +#ifdef __linux__ + mlockall(MCL_CURRENT|MCL_FUTURE); +#endif + case APP: + case HFTA: + if (gscpipc_init(0) < 0) { + gslog(LOG_EMERG,"ERROR:Could not initalize comm layer for " + "non clearinghouse process\n"); + return -1; + } + break; + default: + gslog(LOG_EMERG,"ERROR:Unknown process type\n"); + return -1; + } + + // if the buffersize is zero then allocating shared memory + // will fail. So only use it for the clearinghouse and LFTAs + if ((buffersize<(4*MAXTUPLESZ)) && (buffersize!=0)) { + gslog(LOG_EMERG, + "ERROR:buffersize in hostlib_init has to " + "be at least %u Bytes long\n", + 4*MAXTUPLESZ); + return -1; + } + + curprocess.type=type; + curprocess.buffersize=buffersize; + curprocess.active = 1; + curprocess.deviceid=deviceid; + curprocess.mapcnt=mapcnt; + curprocess.map=map; + return 0; +} + +void hostlib_free() +{ + if (curprocess.active != 1 ) { + return; + } + curprocess.active = 0; + gscpipc_free(); +} + + +gs_retval_t fta_find(FTAname name, gs_uint32_t reuse, FTAID *ftaid, + gs_sp_t schema, gs_int32_t schemasz) +{ + gs_int8_t rb[MAXRES]; + struct fta_find_arg a; + struct ftafind_result * sr = (struct ftafind_result *)rb; + + a.h.callid = FTA_LOOKUP; + a.h.size = sizeof(struct fta_find_arg); + a.reuse=reuse; + if (strlen(name)>=(MAXFTANAME-1)) { + gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name); + return -1; + } + strcpy(a.name,name); + ipc_call_and_wait(clearinghouseftaid,(gs_sp_t )&a,rb); + if (sr->h.callid != FTAFIND_RESULT) { + gslog(LOG_EMERG,"ERROR:Wrong result code received in fta_find\n"); + return -1; + } + if (sr->result >= 0) { + if (schema !=0) { + if (strlen(sr->schema) >= schemasz) { + gslog(LOG_EMERG,"Could not fit schema into schema buffer fta_find\n"); + return -1; + } else { + strcpy(schema,sr->schema); + } + } + *ftaid=sr->f; + } + return sr->result; +} + +gs_retval_t fta_alloc_instance(FTAID subscriber, + FTAID * ftaid, FTAname name, gs_sp_t schema, + gs_uint32_t reusable, + gs_int32_t command, gs_int32_t sz, void * data) +{ + gs_int8_t rb[MAXRES]; + struct fta_alloc_instance_arg * a; + struct fta_result * fr = (struct fta_result *)rb; + struct ringbuf *r; + + /* make sure we have the share memory required */ + if ((r=gscpipc_createshm(*ftaid,curprocess.buffersize))==0) { + gslog(LOG_EMERG,"ERROR:could not allocate shared memory" + "for FTA %s\n",name); + return -1; + } + + if (strlen(name)>=(MAXFTANAME-1)) { + gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name); + return -1; + } + + if (strlen(schema)>=(MAXSCHEMASZ-1)) { + gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name); + return -1; + } + + a = alloca(sizeof(struct fta_alloc_instance_arg) + sz); + + a->h.callid = FTA_ALLOC_INSTANCE; + a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz; + a->f=*ftaid; + a->subscriber=subscriber; + a->reusable=reusable; + a->command = command; + a->sz = sz; + memcpy(&a->data[0],data,sz); + strcpy(a->name,name); + strcpy(a->schema,schema); + + ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb); + + if (fr->h.callid != FTA_RESULT) { + gslog(LOG_EMERG,"ERROR:Wrong result code received\n"); + return -1; + } + + *ftaid=fr->f; + + if (fr->result==0) { + gslog(LOG_INFO,"Allocated fta instance %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid); + return streamregistry_add(*ftaid,r); + } + + return fr->result; +} + +gs_retval_t fta_alloc_print_instance(FTAID subscriber, + FTAID * ftaid, + FTAname name, gs_sp_t schema, gs_uint32_t reusable, + gs_int32_t command, gs_int32_t sz, void * data, + gs_sp_t path,gs_sp_t basename, + gs_sp_t temporal_field, gs_sp_t split_field, + gs_uint32_t delta, gs_uint32_t split) +{ + gs_int8_t rb[MAXRES]; + struct fta_alloc_instance_arg * a; + struct fta_result * fr = (struct fta_result *)rb; + + if ((strlen(path)>=MAXPRINTSTRING-1) + || (strlen(basename)>=MAXPRINTSTRING-1) + || (strlen(temporal_field)>=MAXPRINTSTRING-1)) { + gslog(LOG_EMERG,"INTERNAL ERROR:fta_alloc_print_instance string" + " arguments to long\n"); + return -1; + } + if (strlen(name)>=(MAXFTANAME-1)) { + gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name); + return -1; + } + + if (strlen(schema)>=(MAXSCHEMASZ-1)) { + gslog(LOG_EMERG,"ERROR:FTA name (%s) to large\n",name); + return -1; + } + + a = alloca(sizeof(struct fta_alloc_instance_arg) + sz); + + a->h.callid = FTA_ALLOC_PRINT_INSTANCE; + a->h.size = sizeof(struct fta_alloc_instance_arg)+ sz; + a->f=*ftaid; + a->subscriber=subscriber; + a->reusable=reusable; + a->split=split; + strcpy(a->name,name); + strcpy(a->schema,schema); + a->command = command; + a->sz = sz; + strcpy(a->path,path); + strcpy(a->basename,basename); + strcpy(a->temporal_field,temporal_field); + strcpy(a->split_field,split_field); + a->delta=delta; + memcpy(&a->data[0],data,sz); + + ipc_call_and_wait(*ftaid,(gs_sp_t )a,rb); + + if (fr->h.callid != FTA_RESULT) { + gslog(LOG_EMERG,"ERROR:Wrong result code received\n"); + return -1; + } + + *ftaid=fr->f; + + return fr->result; +} + +gs_retval_t fta_free_instance(FTAID subscriber, FTAID ftaid, gs_uint32_t recursive) +{ + gs_int8_t rb[MAXRES]; + struct fta_free_instance_arg a; + struct standard_result * sr = (struct standard_result *)rb; + struct ringbuf *r; + + a.h.callid = FTA_FREE_INSTANCE; + a.h.size = sizeof(struct fta_free_instance_arg); + a.subscriber=subscriber; + a.f=ftaid; + a.recursive=recursive; + ipc_call_and_wait(ftaid,(gs_sp_t )&a,rb); + if (sr->h.callid != STANDARD_RESULT) { + gslog(LOG_EMERG,"ERROR:Wrong result code received\n"); + return -1; + } + + /* make sure we remove the mapping*/ + streamregistry_remove(ftaid); + + return sr->result; +} + +gs_retval_t fta_control(FTAID subscriber, + FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value) +{ + gs_int8_t rb[MAXRES]; + struct fta_control_arg * a; + struct standard_result * sr = (struct standard_result *)rb; + + a = alloca(sizeof(struct fta_control_arg) + sz); + + a->h.callid = FTA_CONTROL; + a->h.size = sizeof(struct fta_control_arg)+ sz; + a->subscriber=subscriber; + a->f=ftaid; + a->command = command; + a->sz = sz; + memcpy(&a->data[0],value,sz); + + ipc_call_and_wait(ftaid,(gs_sp_t )a,rb); + + if (sr->h.callid != STANDARD_RESULT) { + gslog(LOG_EMERG,"ERROR:Wrong result code received\n"); + return -1; + } + + return sr->result; +} + +gs_retval_t fta_heartbeat(FTAID self,gs_uint64_t trace_id, + gs_uint32_t sz, fta_stat * trace){ +#ifdef CLEARINGHOUSE_HEARTBEAT + struct fta_heartbeat_arg * a; + a = alloca(sizeof(struct fta_heartbeat_arg) + (sz*sizeof(fta_stat))); + a->h.callid = FTA_HEARTBEAT; + a->h.size = sizeof(struct fta_heartbeat_arg)+(sz*sizeof(fta_stat)); + a->sender=self; + a->trace_id=trace_id; + a->sz=sz; + if (sz!=0) { + memcpy(&a->data[0],trace,(sz*sizeof(fta_stat))); + } +#ifdef PRINTMSG + fprintf(stderr, "HOST sending heartbeat to %u.%u.%u.%u:%u of " + "type %u with length %u\n", + (clearinghouseftaid.ip>>24)&0xff, + (clearinghouseftaid.ip>>16)&0xff, + (clearinghouseftaid.ip>>8)&0xff, + (clearinghouseftaid.ip)&0xff, + clearinghouseftaid.port,a->h.callid,a->h.size); +#endif + if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)a,a->h.size,1)<0) { + gslog(LOG_EMERG,"ERROR:Could not send on message queue\n"); + return -1; + } +#endif + return 0; +} + +gs_retval_t fta_notify_producer_failure(FTAID self, FTAID producer){ + struct fta_notify_producer_failure_arg a; + a.h.callid = FTA_PRODUCER_FAILURE; + a.h.size = sizeof(struct fta_notify_producer_failure_arg); + a.sender=self; + a.producer=producer; +#ifdef PRINTMSG + fprintf(stderr, "HOST sending producer failure to %u.%u.%u.%u:%u of " + "type %u with length %u\n", + (clearinghouseftaid.ip>>24)&0xff, + (clearinghouseftaid.ip>>16)&0xff, + (clearinghouseftaid.ip>>8)&0xff, + (clearinghouseftaid.ip)&0xff, + clearinghouseftaid.port,a.h.callid,a.h.size); +#endif + if (gscpipc_send(clearinghouseftaid,FTACALLBACK,(gs_sp_t)&a,a.h.size,1)<0) { + gslog(LOG_EMERG,"ERROR:Could not send on message queue\n"); + return -1; + } + return 0; +} + +gs_retval_t process_control(FTAID ftaid, gs_int32_t command, gs_int32_t sz, void * value) +{ + gs_int8_t rb[MAXRES]; + struct process_control_arg * a; + struct standard_result * sr = (struct standard_result *)rb; + + + a = alloca(sizeof(struct process_control_arg) + sz); + + a->h.callid = PROCESS_CONTROL; + a->h.size = sizeof(struct process_control_arg)+ sz; + a->command = command; + a->sz = sz; + memcpy(&a->data[0],value,sz); + + ipc_call_and_wait(ftaid,(gs_sp_t )a,rb); + + if (sr->h.callid != STANDARD_RESULT) { + gslog(LOG_EMERG,"ERROR:Wrong result code received\n"); + return -1; + } + + return sr->result; +} + + +static void timeouthandler () +{ + struct timeout_result a; + + a.h.callid=TIMEOUT; + a.h.size=sizeof(struct timeout_result); + if (gscpipc_send(gscpipc_getftaid(), FTACALLBACK, (gs_sp_t )&a,a.h.size,1)<0) { + gslog(LOG_EMERG,"ERROR:Could not send on message queue\n"); + } +} + +gs_retval_t gscp_get_buffer(FTAID * ftaid, gs_int32_t * size, void *tbuffer, + gs_int32_t tbuf_size, gs_int32_t timeout) +{ + struct ringbuf * r; + FTAID from; + gs_int32_t length; + gs_int8_t buf[MAXMSGSZ]; + gs_int32_t lopp; + FTAID * f; + static gs_uint64_t s1=0; + static gs_uint64_t s2; + if (s1==0) { + s1=rdtsc(); + } + s2=rdtsc(); + cycles+=(s2-s1); +start: +#ifdef PRINTMSG + fprintf(stderr,"CHECK RINGBUFS\n"); +#endif +#ifndef POLLING + /* use chance to cleanout message queue no reason + to keep anything else */ + while (gscpipc_read(&from,&lopp,buf,&length,0)>0); +#endif + + streamregistry_getactiveringbuf_reset(); + while ((r=streamregistry_getactiveringbuf())>0) { +#ifdef PRINTMSG + fprintf(stderr,"Reading from ringpuffer %p [%p:%u]" + "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer, + r->length); + if (UNREAD(r)) { + fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next, + (CURREAD(r)->f.ip>>24)&0xff, + (CURREAD(r)->f.ip>>16)&0xff, + (CURREAD(r)->f.ip>>8)&0xff, + (CURREAD(r)->f.ip)&0xff, + CURREAD(r)->f.port, + CURREAD(r)->f.streamid, + CURREAD(r)->sz); + } + +#endif + if (UNREAD(r)) { + *ftaid=(CURREAD(r)->f); + *size=CURREAD(r)->sz; + memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size); + intuple++; + inbytes+=CURREAD(r)->sz; + ADVANCEREAD(r); + s1=rdtsc(); + return 0; + } + } + if (timeout == -1) { + *size=0; + s1=rdtsc(); + return 1; + } + if (timeout !=0) { + signal(SIGALRM, timeouthandler); + alarm(timeout); + } + +#ifndef POLLING +#ifdef PRINTMSG + fprintf(stderr,"START BLOCKCALLS\n"); +#endif + streamregistry_getactiveftaid_reset(); + while ((f=streamregistry_getactiveftaid())!=0) { + struct gscp_get_buffer_arg a; + a.h.callid = GSCP_GET_BUFFER; + a.h.size = sizeof(struct gscp_get_buffer_arg); + a.timeout = timeout; +#ifdef PRINTMSG + fprintf(stderr,"Waiting for %u.%u.%u.%u:%u\n", + (f->ip>>24)&0xff, + (f->ip>>16)&0xff, + (f->ip>>8)&0xff, + (f->ip)&0xff, + f->port + ); +#endif + if (gscpipc_send(*f,FTACALLBACK,(gs_sp_t )&a,a.h.size,1)<0) { + s1=rdtsc(); + return -1; + } + } +#ifdef PRINTMSG + fprintf(stderr,"BLOCK\n"); +#endif + while (gscpipc_read(&from,&lopp,buf,&length,1)>0) { +#else // If we poll we return after 100 msec + sleepagain: + while (gscpipc_read(&from,&lopp,buf,&length,2)>0) { +#endif + struct standard_result * sr = (struct standard_result *) buf; +#ifdef PRINTMSG + fprintf(stderr,"Got return code %u\n",sr->h.callid); +#endif + if (lopp==FTACALLBACK) { + if (timeout != 0) { + signal(SIGALRM, SIG_IGN); + } + if (sr->h.callid == WAKEUP) { + /* use chance to cleanout message queue no reason + to keep anything else */ + while (gscpipc_read(&from,&lopp,buf,&length,0)>0); + goto start; + } + if (sr->h.callid == TIMEOUT) { + /* use chance to cleanout message queue no reason + to keep anything else */ + while (gscpipc_read(&from,&lopp,buf,&length,0)>0); + *size=0; + s1=rdtsc(); + return 1; + } + if (sidequeue_append(from,buf,length)<0) { + gslog(LOG_EMERG,"ERROR:: Could not add to sidequeue\n"); + s1=rdtsc(); + return -1; + } + } + } +#ifdef POLLING + streamregistry_getactiveringbuf_reset(); + while ((r=streamregistry_getactiveringbuf())>0) { +#ifdef PRINTMSG + fprintf(stderr,"Reading from ringpuffer %p [%p:%u]" + "(%u %u %u) \n",r,&r->start,r->end,r->reader,r->writer, + r->length); + if (UNREAD(r)) { + fprintf(stderr,"\t%u %u.%u.%u.%u:%u-%p %u\n",CURREAD(r)->next, + (CURREAD(r)->f.ip>>24)&0xff, + (CURREAD(r)->f.ip>>16)&0xff, + (CURREAD(r)->f.ip>>8)&0xff, + (CURREAD(r)->f.ip)&0xff, + CURREAD(r)->f.port, + CURREAD(r)->f.streamid, + CURREAD(r)->sz); + } + +#endif + if (UNREAD(r)) { + *ftaid=(CURREAD(r)->f); + *size=CURREAD(r)->sz; + memcpy(tbuffer,&(CURREAD(r)->data[0]),(tbuf_size>(*size))?(*size):tbuf_size); + intuple++; + inbytes+=CURREAD(r)->sz; + ADVANCEREAD(r); + if (timeout != 0) { + signal(SIGALRM, SIG_IGN); + } + s1=rdtsc(); + return 0; + } + } + goto sleepagain; // Try again +#endif + gslog(LOG_EMERG,"Unexpected code reached in: gscp_get_buffer \n"); + /* we should never get here */ + s1=rdtsc(); + return -1; + } + + +