Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / clearinghouseregistries.c
index 84dd266..0a3386d 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-
-#include "gsconfig.h"
-#include "gstypes.h"
-#include <lapp.h>
-#include <clearinghouseregistries.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include "gshub.h"
-
-extern const gs_sp_t fta_names[];
-
-
-/* fta lookup registry in the clearinghouse */
-
-
-struct ftalookup {
-    gs_int32_t used;
-    gs_sp_t name;
-    FTAID ftaid;
-    gs_uint32_t  reusable;
-    gs_sp_t  schema;
-};
-
-struct ftalookup * flookup =0;
-gs_int32_t llookup=0;
-
-
-/* Adds a FTA to the lookup table */
-gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,
-                                   FTAname name, gs_uint32_t  reusable,
-                                   gs_csp_t  schema)
-{
-    gs_int32_t x;
-    static gs_int32_t registered_ftas=0;
-    endpoint gshub;
-    if (get_hub(&gshub)!=0) {
-        gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");
-        return -1;
-    }
-    if (llookup == 0) {
-        if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {
-            gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");
-            return -1;
-        }
-        memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);
-        llookup = STARTSZ;
-    }
-    for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);
-    if (x == llookup) {
-        gs_int32_t y;
-        llookup = 2*llookup;
-        if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {
-            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
-            return -1;
-        }
-        for (y=x;y<llookup;y++)
-            flookup[y].used=0;
-    }
-    if (f.streamid==0) {
-        gslog(LOG_INFO,"Basic FTA can not be reusable\n");
-        reusable=0;
-        if (registered_ftas>=0)
-            registered_ftas++; // count them to know when everybody is in
-    } else {
-        // register none basic FTA's with GSHUB
-        if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {
-            gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
-            return -1;
-        }
-    }
-    gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);
-    flookup[x].name=strdup(name);
-    flookup[x].ftaid=f;
-    flookup[x].reusable=reusable;
-    flookup[x].schema=strdup(schema);
-    flookup[x].used=1;
-
-    if (registered_ftas>=0) {
-        for(x=0; fta_names[x]!=0;x++ );
-        if (x<=registered_ftas) {
-            if (set_initinstance(gshub,get_instance_name())!=0) {
-                gslog(LOG_EMERG,"hostlib::error::could not init instance");
-                return -1;
-            }
-            registered_ftas=-1;
-        }
-    }
-    return 0;
-}
-
-/* Removes the FTA from the lookup table */
-gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)
-{
-    gs_int32_t x;
-    for(x=0;x<llookup;x++) {
-        if ((flookup[x].used==1)
-            && (flookup[x].ftaid.streamid)
-            && (flookup[x].ftaid.ip)
-            && (flookup[x].ftaid.port)
-            && (flookup[x].ftaid.index)) {
-            flookup[x].used=0;
-            free(flookup[x].name);
-            free(flookup[x].schema);
-        }
-    }
-    return 0;
-}
-
-/* Looks an FTA up by name */
-gs_retval_t ftalookup_lookup_fta_index(FTAID caller,
-                                       FTAname name, gs_uint32_t  reuse, FTAID * ftaid,
-                                       gs_csp_t  * schema)
-{
-    gs_int32_t x;
-#ifdef PRINTMSG
-    fprintf(stderr,"Name %s reusable %u\n",name,reuse);
-#endif
-    if (reuse==1) {
-        /* grep the firs reusable instance */
-        for(x=0;x<llookup;x++) {
-            if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
-                && (flookup[x].reusable>=1)
-                && (flookup[x].ftaid.streamid!=0)) {
-                *ftaid=flookup[x].ftaid;
-                *schema=flookup[x].schema;
-#ifdef PRINTMSG
-                fprintf(stderr,"\tREUSE FTA\n");
-#endif
-                return 0;
-            }
-        }
-    }
-    /* grep the first fta with the name not an instance */
-    for(x=0;x<llookup;x++) {
-        if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
-            && (flookup[x].ftaid.streamid==0)) {
-            *ftaid=flookup[x].ftaid;
-            *schema=flookup[x].schema;
-#ifdef PRINTMSG
-            fprintf(stderr,"\tNEW FTA\n");
-#endif
-
-            gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
-            return 0;
-        }
-    }
-#ifdef PRINTMSG
-    fprintf(stderr,"NO MATCH\n");
-#endif
-    return -1;
-}
-
-gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {
-    return 0;
-}
-
-gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,
-                                gs_uint32_t  sz, fta_stat * trace){
-
-       gs_uint32_t i = 0;
-    endpoint gshub;
-    if (get_hub(&gshub)!=0) {
-        gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");
-        return -1;
-    }
-
-    // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1
-       // for application heartbeats (streamid=0) we will only send last stat in their traces
-    if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {
-        if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {
-               gslog(LOG_EMERG,"ERROR:could not set instancestats");
-               return -1;
-               }
-
-    }
-
-    #ifdef PRINTMSG
-       gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);
-       for (i = 0; i < sz; ++i) {
-               gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,
-              trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,
-              trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);
-       }
-    #endif
-    return 0;
-}
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+\r
+#include "gsconfig.h"\r
+#include "gstypes.h"\r
+#include <lapp.h>\r
+#include <clearinghouseregistries.h>\r
+#include <stdio.h>\r
+#include <stdlib.h>\r
+#include <string.h>\r
+#include "gshub.h"\r
+\r
+extern const gs_sp_t fta_names[];\r
+\r
+\r
+/* fta lookup registry in the clearinghouse */\r
+\r
+\r
+struct ftalookup {\r
+    gs_int32_t used;\r
+    gs_sp_t name;\r
+    FTAID ftaid;\r
+    gs_uint32_t  reusable;\r
+    gs_sp_t  schema;\r
+};\r
+\r
+struct ftalookup * flookup =0;\r
+gs_int32_t llookup=0;\r
+\r
+\r
+/* Adds a FTA to the lookup table */\r
+gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,\r
+                                   FTAname name, gs_uint32_t  reusable,\r
+                                   gs_csp_t  schema)\r
+{\r
+    gs_int32_t x;\r
+    static gs_int32_t registered_ftas=0;\r
+    endpoint gshub;\r
+    if (get_hub(&gshub)!=0) {\r
+        gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");\r
+        return -1;\r
+    }\r
+    if (llookup == 0) {\r
+        if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {\r
+            gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");\r
+            return -1;\r
+        }\r
+        memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);\r
+        llookup = STARTSZ;\r
+    }\r
+    for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);\r
+    if (x == llookup) {\r
+        gs_int32_t y;\r
+        llookup = 2*llookup;\r
+        if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {\r
+            gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
+            return -1;\r
+        }\r
+        for (y=x;y<llookup;y++)\r
+            flookup[y].used=0;\r
+    }\r
+    if (f.streamid==0) {\r
+        gslog(LOG_INFO,"Basic FTA can not be reusable\n");\r
+        reusable=0;\r
+        if (registered_ftas>=0)\r
+            registered_ftas++; // count them to know when everybody is in\r
+    } else {\r
+        // register none basic FTA's with GSHUB\r
+        if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {\r
+            gslog(LOG_EMERG,"ERROR:could not set_ftainstance");\r
+            return -1;\r
+        }\r
+    }\r
+    gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);\r
+    flookup[x].name=strdup(name);\r
+    flookup[x].ftaid=f;\r
+    flookup[x].reusable=reusable;\r
+    flookup[x].schema=strdup(schema);\r
+    flookup[x].used=1;\r
+\r
+    if (registered_ftas>=0) {\r
+        for(x=0; fta_names[x]!=0;x++ );\r
+        if (x<=registered_ftas) {\r
+            if (set_initinstance(gshub,get_instance_name())!=0) {\r
+                gslog(LOG_EMERG,"hostlib::error::could not init instance");\r
+                return -1;\r
+            }\r
+            registered_ftas=-1;\r
+        }\r
+    }\r
+    return 0;\r
+}\r
+\r
+/* Removes the FTA from the lookup table */\r
+gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)\r
+{\r
+    gs_int32_t x;\r
+    for(x=0;x<llookup;x++) {\r
+        if ((flookup[x].used==1)\r
+            && (flookup[x].ftaid.streamid)\r
+            && (flookup[x].ftaid.ip)\r
+            && (flookup[x].ftaid.port)\r
+            && (flookup[x].ftaid.index)) {\r
+            flookup[x].used=0;\r
+            free(flookup[x].name);\r
+            free(flookup[x].schema);\r
+        }\r
+    }\r
+    return 0;\r
+}\r
+\r
+/* Looks an FTA up by name */\r
+gs_retval_t ftalookup_lookup_fta_index(FTAID caller,\r
+                                       FTAname name, gs_uint32_t  reuse, FTAID * ftaid,\r
+                                       gs_csp_t  * schema)\r
+{\r
+    gs_int32_t x;\r
+#ifdef PRINTMSG\r
+    fprintf(stderr,"Name %s reusable %u\n",name,reuse);\r
+#endif\r
+    if (reuse==1) {\r
+        /* grep the firs reusable instance */\r
+        for(x=0;x<llookup;x++) {\r
+            if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)\r
+                && (flookup[x].reusable>=1)\r
+                && (flookup[x].ftaid.streamid!=0)) {\r
+                *ftaid=flookup[x].ftaid;\r
+                *schema=flookup[x].schema;\r
+#ifdef PRINTMSG\r
+                fprintf(stderr,"\tREUSE FTA\n");\r
+#endif\r
+                return 0;\r
+            }\r
+        }\r
+    }\r
+    /* grep the first fta with the name not an instance */\r
+    for(x=0;x<llookup;x++) {\r
+        if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)\r
+            && (flookup[x].ftaid.streamid==0)) {\r
+            *ftaid=flookup[x].ftaid;\r
+            *schema=flookup[x].schema;\r
+#ifdef PRINTMSG\r
+            fprintf(stderr,"\tNEW FTA\n");\r
+#endif\r
+\r
+            gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);\r
+            return 0;\r
+        }\r
+    }\r
+#ifdef PRINTMSG\r
+    fprintf(stderr,"NO MATCH\n");\r
+#endif\r
+    return -1;\r
+}\r
+\r
+gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {\r
+    return 0;\r
+}\r
+\r
+gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,\r
+                                gs_uint32_t  sz, fta_stat * trace){\r
+\r
+       gs_uint32_t i = 0;\r
+    endpoint gshub;\r
+    if (get_hub(&gshub)!=0) {\r
+        gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");\r
+        return -1;\r
+    }\r
+\r
+    // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1\r
+       // for application heartbeats (streamid=0) we will only send last stat in their traces\r
+    if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {\r
+        if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {\r
+               gslog(LOG_EMERG,"ERROR:could not set instancestats");\r
+               return -1;\r
+               }\r
+\r
+    }\r
+\r
+    #ifdef PRINTMSG\r
+       gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);\r
+       for (i = 0; i < sz; ++i) {\r
+               gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,\r
+              trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,\r
+              trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);\r
+       }\r
+    #endif\r
+    return 0;\r
+}\r