1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
19 #include <clearinghouseregistries.h>
25 extern const gs_sp_t fta_names[];
28 /* fta lookup registry in the clearinghouse */
39 struct ftalookup * flookup =0;
43 /* Adds a FTA to the lookup table */
44 gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,
45 FTAname name, gs_uint32_t reusable,
49 static gs_int32_t registered_ftas=0;
51 if (get_hub(&gshub)!=0) {
52 gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");
56 if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {
57 gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");
60 memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);
63 for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);
67 if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {
68 gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
71 for (y=x;y<llookup;y++)
75 gslog(LOG_INFO,"Basic FTA can not be reusable\n");
77 if (registered_ftas>=0)
78 registered_ftas++; // count them to know when everybody is in
80 // register none basic FTA's with GSHUB
81 if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {
82 gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
86 gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);
87 flookup[x].name=strdup(name);
89 flookup[x].reusable=reusable;
90 flookup[x].schema=strdup(schema);
93 if (registered_ftas>=0) {
94 for(x=0; fta_names[x]!=0;x++ );
95 if (x<=registered_ftas) {
96 if (set_initinstance(gshub,get_instance_name())!=0) {
97 gslog(LOG_EMERG,"hostlib::error::could not init instance");
106 /* Removes the FTA from the lookup table */
107 gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)
110 for(x=0;x<llookup;x++) {
111 if ((flookup[x].used==1)
112 && (flookup[x].ftaid.streamid)
113 && (flookup[x].ftaid.ip)
114 && (flookup[x].ftaid.port)
115 && (flookup[x].ftaid.index)) {
117 free(flookup[x].name);
118 free(flookup[x].schema);
124 /* Looks an FTA up by name */
125 gs_retval_t ftalookup_lookup_fta_index(FTAID caller,
126 FTAname name, gs_uint32_t reuse, FTAID * ftaid,
131 fprintf(stderr,"Name %s reusable %u\n",name,reuse);
134 /* grep the firs reusable instance */
135 for(x=0;x<llookup;x++) {
136 if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
137 && (flookup[x].reusable>=1)
138 && (flookup[x].ftaid.streamid!=0)) {
139 *ftaid=flookup[x].ftaid;
140 *schema=flookup[x].schema;
142 fprintf(stderr,"\tREUSE FTA\n");
148 /* grep the first fta with the name not an instance */
149 for(x=0;x<llookup;x++) {
150 if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
151 && (flookup[x].ftaid.streamid==0)) {
152 *ftaid=flookup[x].ftaid;
153 *schema=flookup[x].schema;
155 fprintf(stderr,"\tNEW FTA\n");
158 gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
163 fprintf(stderr,"NO MATCH\n");
168 gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {
172 gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,
173 gs_uint32_t sz, fta_stat * trace){
177 if (get_hub(&gshub)!=0) {
178 gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");
182 // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1
183 // for application heartbeats (streamid=0) we will only send last stat in their traces
184 if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {
185 if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {
186 gslog(LOG_EMERG,"ERROR:could not set instancestats");
193 gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);
194 for (i = 0; i < sz; ++i) {
195 gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,
196 trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,
197 trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);