-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#include "gsconfig.h"
-#include "gstypes.h"
-#include <lapp.h>
-#include <clearinghouseregistries.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include "gshub.h"
-
-extern const gs_sp_t fta_names[];
-
-
-/* fta lookup registry in the clearinghouse */
-
-
-struct ftalookup {
- gs_int32_t used;
- gs_sp_t name;
- FTAID ftaid;
- gs_uint32_t reusable;
- gs_sp_t schema;
-};
-
-struct ftalookup * flookup =0;
-gs_int32_t llookup=0;
-
-
-/* Adds a FTA to the lookup table */
-gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,
- FTAname name, gs_uint32_t reusable,
- gs_csp_t schema)
-{
- gs_int32_t x;
- static gs_int32_t registered_ftas=0;
- endpoint gshub;
- if (get_hub(&gshub)!=0) {
- gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");
- return -1;
- }
- if (llookup == 0) {
- if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");
- return -1;
- }
- memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);
- llookup = STARTSZ;
- }
- for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);
- if (x == llookup) {
- gs_int32_t y;
- llookup = 2*llookup;
- if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {
- gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
- return -1;
- }
- for (y=x;y<llookup;y++)
- flookup[y].used=0;
- }
- if (f.streamid==0) {
- gslog(LOG_INFO,"Basic FTA can not be reusable\n");
- reusable=0;
- if (registered_ftas>=0)
- registered_ftas++; // count them to know when everybody is in
- } else {
- // register none basic FTA's with GSHUB
- if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {
- gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
- return -1;
- }
- }
- gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);
- flookup[x].name=strdup(name);
- flookup[x].ftaid=f;
- flookup[x].reusable=reusable;
- flookup[x].schema=strdup(schema);
- flookup[x].used=1;
-
- if (registered_ftas>=0) {
- for(x=0; fta_names[x]!=0;x++ );
- if (x<=registered_ftas) {
- if (set_initinstance(gshub,get_instance_name())!=0) {
- gslog(LOG_EMERG,"hostlib::error::could not init instance");
- return -1;
- }
- registered_ftas=-1;
- }
- }
- return 0;
-}
-
-/* Removes the FTA from the lookup table */
-gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)
-{
- gs_int32_t x;
- for(x=0;x<llookup;x++) {
- if ((flookup[x].used==1)
- && (flookup[x].ftaid.streamid)
- && (flookup[x].ftaid.ip)
- && (flookup[x].ftaid.port)
- && (flookup[x].ftaid.index)) {
- flookup[x].used=0;
- free(flookup[x].name);
- free(flookup[x].schema);
- }
- }
- return 0;
-}
-
-/* Looks an FTA up by name */
-gs_retval_t ftalookup_lookup_fta_index(FTAID caller,
- FTAname name, gs_uint32_t reuse, FTAID * ftaid,
- gs_csp_t * schema)
-{
- gs_int32_t x;
-#ifdef PRINTMSG
- fprintf(stderr,"Name %s reusable %u\n",name,reuse);
-#endif
- if (reuse==1) {
- /* grep the firs reusable instance */
- for(x=0;x<llookup;x++) {
- if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
- && (flookup[x].reusable>=1)
- && (flookup[x].ftaid.streamid!=0)) {
- *ftaid=flookup[x].ftaid;
- *schema=flookup[x].schema;
-#ifdef PRINTMSG
- fprintf(stderr,"\tREUSE FTA\n");
-#endif
- return 0;
- }
- }
- }
- /* grep the first fta with the name not an instance */
- for(x=0;x<llookup;x++) {
- if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
- && (flookup[x].ftaid.streamid==0)) {
- *ftaid=flookup[x].ftaid;
- *schema=flookup[x].schema;
-#ifdef PRINTMSG
- fprintf(stderr,"\tNEW FTA\n");
-#endif
-
- gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
- return 0;
- }
- }
-#ifdef PRINTMSG
- fprintf(stderr,"NO MATCH\n");
-#endif
- return -1;
-}
-
-gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {
- return 0;
-}
-
-gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,
- gs_uint32_t sz, fta_stat * trace){
-
- gs_uint32_t i = 0;
- endpoint gshub;
- if (get_hub(&gshub)!=0) {
- gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");
- return -1;
- }
-
- // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1
- // for application heartbeats (streamid=0) we will only send last stat in their traces
- if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {
- if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {
- gslog(LOG_EMERG,"ERROR:could not set instancestats");
- return -1;
- }
-
- }
-
- #ifdef PRINTMSG
- gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);
- for (i = 0; i < sz; ++i) {
- gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,
- trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,
- trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);
- }
- #endif
- return 0;
-}
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include <lapp.h>\r
+#include <clearinghouseregistries.h>\r
+#include <stdio.h>\r
+#include <stdlib.h>\r
+#include <string.h>\r
+#include "gshub.h"\r
+\r
+extern const gs_sp_t fta_names[];\r
+\r
+\r
+/* fta lookup registry in the clearinghouse */\r
+\r
+\r
+struct ftalookup {\r
+ gs_int32_t used;\r
+ gs_sp_t name;\r
+ FTAID ftaid;\r
+ gs_uint32_t reusable;\r
+ gs_sp_t schema;\r
+};\r
+\r
+struct ftalookup * flookup =0;\r
+gs_int32_t llookup=0;\r
+\r
+\r
+/* Adds a FTA to the lookup table */\r
+gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,\r
+ FTAname name, gs_uint32_t reusable,\r
+ gs_csp_t schema)\r
+{\r
+ gs_int32_t x;\r
+ static gs_int32_t registered_ftas=0;\r
+ endpoint gshub;\r
+ if (get_hub(&gshub)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");\r
+ return -1;\r
+ }\r
+ if (llookup == 0) {\r
+ if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");\r
+ return -1;\r
+ }\r
+ memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);\r
+ llookup = STARTSZ;\r
+ }\r
+ for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);\r
+ if (x == llookup) {\r
+ gs_int32_t y;\r
+ llookup = 2*llookup;\r
+ if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {\r
+ gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+ return -1;\r
+ }\r
+ for (y=x;y<llookup;y++)\r
+ flookup[y].used=0;\r
+ }\r
+ if (f.streamid==0) {\r
+ gslog(LOG_INFO,"Basic FTA can not be reusable\n");\r
+ reusable=0;\r
+ if (registered_ftas>=0)\r
+ registered_ftas++; // count them to know when everybody is in\r
+ } else {\r
+ // register none basic FTA's with GSHUB\r
+ if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:could not set_ftainstance");\r
+ return -1;\r
+ }\r
+ }\r
+ gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);\r
+ flookup[x].name=strdup(name);\r
+ flookup[x].ftaid=f;\r
+ flookup[x].reusable=reusable;\r
+ flookup[x].schema=strdup(schema);\r
+ flookup[x].used=1;\r
+\r
+ if (registered_ftas>=0) {\r
+ for(x=0; fta_names[x]!=0;x++ );\r
+ if (x<=registered_ftas) {\r
+ if (set_initinstance(gshub,get_instance_name())!=0) {\r
+ gslog(LOG_EMERG,"hostlib::error::could not init instance");\r
+ return -1;\r
+ }\r
+ registered_ftas=-1;\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* Removes the FTA from the lookup table */\r
+gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)\r
+{\r
+ gs_int32_t x;\r
+ for(x=0;x<llookup;x++) {\r
+ if ((flookup[x].used==1)\r
+ && (flookup[x].ftaid.streamid)\r
+ && (flookup[x].ftaid.ip)\r
+ && (flookup[x].ftaid.port)\r
+ && (flookup[x].ftaid.index)) {\r
+ flookup[x].used=0;\r
+ free(flookup[x].name);\r
+ free(flookup[x].schema);\r
+ }\r
+ }\r
+ return 0;\r
+}\r
+\r
+/* Looks an FTA up by name */\r
+gs_retval_t ftalookup_lookup_fta_index(FTAID caller,\r
+ FTAname name, gs_uint32_t reuse, FTAID * ftaid,\r
+ gs_csp_t * schema)\r
+{\r
+ gs_int32_t x;\r
+#ifdef PRINTMSG\r
+ fprintf(stderr,"Name %s reusable %u\n",name,reuse);\r
+#endif\r
+ if (reuse==1) {\r
+ /* grep the firs reusable instance */\r
+ for(x=0;x<llookup;x++) {\r
+ if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)\r
+ && (flookup[x].reusable>=1)\r
+ && (flookup[x].ftaid.streamid!=0)) {\r
+ *ftaid=flookup[x].ftaid;\r
+ *schema=flookup[x].schema;\r
+#ifdef PRINTMSG\r
+ fprintf(stderr,"\tREUSE FTA\n");\r
+#endif\r
+ return 0;\r
+ }\r
+ }\r
+ }\r
+ /* grep the first fta with the name not an instance */\r
+ for(x=0;x<llookup;x++) {\r
+ if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)\r
+ && (flookup[x].ftaid.streamid==0)) {\r
+ *ftaid=flookup[x].ftaid;\r
+ *schema=flookup[x].schema;\r
+#ifdef PRINTMSG\r
+ fprintf(stderr,"\tNEW FTA\n");\r
+#endif\r
+\r
+ gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);\r
+ return 0;\r
+ }\r
+ }\r
+#ifdef PRINTMSG\r
+ fprintf(stderr,"NO MATCH\n");\r
+#endif\r
+ return -1;\r
+}\r
+\r
+gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {\r
+ return 0;\r
+}\r
+\r
+gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,\r
+ gs_uint32_t sz, fta_stat * trace){\r
+\r
+ gs_uint32_t i = 0;\r
+ endpoint gshub;\r
+ if (get_hub(&gshub)!=0) {\r
+ gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");\r
+ return -1;\r
+ }\r
+\r
+ // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1\r
+ // for application heartbeats (streamid=0) we will only send last stat in their traces\r
+ if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {\r
+ if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {\r
+ gslog(LOG_EMERG,"ERROR:could not set instancestats");\r
+ return -1;\r
+ }\r
+\r
+ }\r
+\r
+ #ifdef PRINTMSG\r
+ gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);\r
+ for (i = 0; i < sz; ++i) {\r
+ gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,\r
+ trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,\r
+ trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);\r
+ }\r
+ #endif\r
+ return 0;\r
+}\r