Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscphost / clearinghouseregistries.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6 \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #include "gsconfig.h"\r
17 #include "gstypes.h"\r
18 #include <lapp.h>\r
19 #include <clearinghouseregistries.h>\r
20 #include <stdio.h>\r
21 #include <stdlib.h>\r
22 #include <string.h>\r
23 #include "gshub.h"\r
24 \r
25 extern const gs_sp_t fta_names[];\r
26 \r
27 \r
28 /* fta lookup registry in the clearinghouse */\r
29 \r
30 \r
31 struct ftalookup {\r
32     gs_int32_t used;\r
33     gs_sp_t name;\r
34     FTAID ftaid;\r
35     gs_uint32_t  reusable;\r
36     gs_sp_t  schema;\r
37 };\r
38 \r
39 struct ftalookup * flookup =0;\r
40 gs_int32_t llookup=0;\r
41 \r
42 \r
43 /* Adds a FTA to the lookup table */\r
44 gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,\r
45                                    FTAname name, gs_uint32_t  reusable,\r
46                                    gs_csp_t  schema)\r
47 {\r
48     gs_int32_t x;\r
49     static gs_int32_t registered_ftas=0;\r
50     endpoint gshub;\r
51     if (get_hub(&gshub)!=0) {\r
52         gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");\r
53         return -1;\r
54     }\r
55     if (llookup == 0) {\r
56         if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {\r
57             gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");\r
58             return -1;\r
59         }\r
60         memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);\r
61         llookup = STARTSZ;\r
62     }\r
63     for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);\r
64     if (x == llookup) {\r
65         gs_int32_t y;\r
66         llookup = 2*llookup;\r
67         if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {\r
68             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");\r
69             return -1;\r
70         }\r
71         for (y=x;y<llookup;y++)\r
72             flookup[y].used=0;\r
73     }\r
74     if (f.streamid==0) {\r
75         gslog(LOG_INFO,"Basic FTA can not be reusable\n");\r
76         reusable=0;\r
77         if (registered_ftas>=0)\r
78             registered_ftas++; // count them to know when everybody is in\r
79     } else {\r
80         // register none basic FTA's with GSHUB\r
81         if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {\r
82             gslog(LOG_EMERG,"ERROR:could not set_ftainstance");\r
83             return -1;\r
84         }\r
85     }\r
86     gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);\r
87     flookup[x].name=strdup(name);\r
88     flookup[x].ftaid=f;\r
89     flookup[x].reusable=reusable;\r
90     flookup[x].schema=strdup(schema);\r
91     flookup[x].used=1;\r
92 \r
93     if (registered_ftas>=0) {\r
94         for(x=0; fta_names[x]!=0;x++ );\r
95         if (x<=registered_ftas) {\r
96             if (set_initinstance(gshub,get_instance_name())!=0) {\r
97                 gslog(LOG_EMERG,"hostlib::error::could not init instance");\r
98                 return -1;\r
99             }\r
100             registered_ftas=-1;\r
101         }\r
102     }\r
103     return 0;\r
104 }\r
105 \r
106 /* Removes the FTA from the lookup table */\r
107 gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)\r
108 {\r
109     gs_int32_t x;\r
110     for(x=0;x<llookup;x++) {\r
111         if ((flookup[x].used==1)\r
112             && (flookup[x].ftaid.streamid)\r
113             && (flookup[x].ftaid.ip)\r
114             && (flookup[x].ftaid.port)\r
115             && (flookup[x].ftaid.index)) {\r
116             flookup[x].used=0;\r
117             free(flookup[x].name);\r
118             free(flookup[x].schema);\r
119         }\r
120     }\r
121     return 0;\r
122 }\r
123 \r
124 /* Looks an FTA up by name */\r
125 gs_retval_t ftalookup_lookup_fta_index(FTAID caller,\r
126                                        FTAname name, gs_uint32_t  reuse, FTAID * ftaid,\r
127                                        gs_csp_t  * schema)\r
128 {\r
129     gs_int32_t x;\r
130 #ifdef PRINTMSG\r
131     fprintf(stderr,"Name %s reusable %u\n",name,reuse);\r
132 #endif\r
133     if (reuse==1) {\r
134         /* grep the firs reusable instance */\r
135         for(x=0;x<llookup;x++) {\r
136             if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)\r
137                 && (flookup[x].reusable>=1)\r
138                 && (flookup[x].ftaid.streamid!=0)) {\r
139                 *ftaid=flookup[x].ftaid;\r
140                 *schema=flookup[x].schema;\r
141 #ifdef PRINTMSG\r
142                 fprintf(stderr,"\tREUSE FTA\n");\r
143 #endif\r
144                 return 0;\r
145             }\r
146         }\r
147     }\r
148     /* grep the first fta with the name not an instance */\r
149     for(x=0;x<llookup;x++) {\r
150         if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)\r
151             && (flookup[x].ftaid.streamid==0)) {\r
152             *ftaid=flookup[x].ftaid;\r
153             *schema=flookup[x].schema;\r
154 #ifdef PRINTMSG\r
155             fprintf(stderr,"\tNEW FTA\n");\r
156 #endif\r
157 \r
158             gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);\r
159             return 0;\r
160         }\r
161     }\r
162 #ifdef PRINTMSG\r
163     fprintf(stderr,"NO MATCH\n");\r
164 #endif\r
165     return -1;\r
166 }\r
167 \r
168 gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {\r
169     return 0;\r
170 }\r
171 \r
172 gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,\r
173                                 gs_uint32_t  sz, fta_stat * trace){\r
174 \r
175         gs_uint32_t i = 0;\r
176     endpoint gshub;\r
177     if (get_hub(&gshub)!=0) {\r
178         gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");\r
179         return -1;\r
180     }\r
181 \r
182     // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1\r
183         // for application heartbeats (streamid=0) we will only send last stat in their traces\r
184     if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {\r
185         if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {\r
186                 gslog(LOG_EMERG,"ERROR:could not set instancestats");\r
187                 return -1;\r
188                 }\r
189 \r
190     }\r
191 \r
192     #ifdef PRINTMSG\r
193         gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);\r
194         for (i = 0; i < sz; ++i) {\r
195                 gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,\r
196               trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,\r
197               trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);\r
198         }\r
199     #endif\r
200     return 0;\r
201 }\r