84dd266e6d2386f512405c4faeda430af3f64a55
[com/gs-lite.git] / src / lib / gscphost / clearinghouseregistries.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6
7  http://www.apache.org/licenses/LICENSE-2.0
8
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16 #include "gsconfig.h"
17 #include "gstypes.h"
18 #include <lapp.h>
19 #include <clearinghouseregistries.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include "gshub.h"
24
25 extern const gs_sp_t fta_names[];
26
27
28 /* fta lookup registry in the clearinghouse */
29
30
31 struct ftalookup {
32     gs_int32_t used;
33     gs_sp_t name;
34     FTAID ftaid;
35     gs_uint32_t  reusable;
36     gs_sp_t  schema;
37 };
38
39 struct ftalookup * flookup =0;
40 gs_int32_t llookup=0;
41
42
43 /* Adds a FTA to the lookup table */
44 gs_retval_t ftalookup_register_fta( FTAID subscriber, FTAID f,
45                                    FTAname name, gs_uint32_t  reusable,
46                                    gs_csp_t  schema)
47 {
48     gs_int32_t x;
49     static gs_int32_t registered_ftas=0;
50     endpoint gshub;
51     if (get_hub(&gshub)!=0) {
52         gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta/instance");
53         return -1;
54     }
55     if (llookup == 0) {
56         if ((flookup = malloc(sizeof(struct ftalookup)*STARTSZ))==0) {
57             gslog(LOG_EMERG,"ERROR:Out of memory ftalookup\n");
58             return -1;
59         }
60         memset(flookup,0,sizeof(struct ftalookup)*STARTSZ);
61         llookup = STARTSZ;
62     }
63     for(x=0;(x<llookup)&&(flookup[x].used!=0);x++);
64     if (x == llookup) {
65         gs_int32_t y;
66         llookup = 2*llookup;
67         if ((flookup = realloc(flookup,llookup*sizeof(struct ftalookup)))==0) {
68             gslog(LOG_EMERG,"ERROR:Out of memory ftacallback\n");
69             return -1;
70         }
71         for (y=x;y<llookup;y++)
72             flookup[y].used=0;
73     }
74     if (f.streamid==0) {
75         gslog(LOG_INFO,"Basic FTA can not be reusable\n");
76         reusable=0;
77         if (registered_ftas>=0)
78             registered_ftas++; // count them to know when everybody is in
79     } else {
80         // register none basic FTA's with GSHUB
81         if (set_ftainstance(gshub,get_instance_name(),(gs_sp_t)name,&f)!=0) {
82             gslog(LOG_EMERG,"ERROR:could not set_ftainstance");
83             return -1;
84         }
85     }
86     gslog(LOG_INFO,"Adding fta to registry %s reuseable %u\n",name,reusable);
87     flookup[x].name=strdup(name);
88     flookup[x].ftaid=f;
89     flookup[x].reusable=reusable;
90     flookup[x].schema=strdup(schema);
91     flookup[x].used=1;
92
93     if (registered_ftas>=0) {
94         for(x=0; fta_names[x]!=0;x++ );
95         if (x<=registered_ftas) {
96             if (set_initinstance(gshub,get_instance_name())!=0) {
97                 gslog(LOG_EMERG,"hostlib::error::could not init instance");
98                 return -1;
99             }
100             registered_ftas=-1;
101         }
102     }
103     return 0;
104 }
105
106 /* Removes the FTA from the lookup table */
107 gs_retval_t ftalookup_unregister_fta(FTAID subscriber,FTAID f)
108 {
109     gs_int32_t x;
110     for(x=0;x<llookup;x++) {
111         if ((flookup[x].used==1)
112             && (flookup[x].ftaid.streamid)
113             && (flookup[x].ftaid.ip)
114             && (flookup[x].ftaid.port)
115             && (flookup[x].ftaid.index)) {
116             flookup[x].used=0;
117             free(flookup[x].name);
118             free(flookup[x].schema);
119         }
120     }
121     return 0;
122 }
123
124 /* Looks an FTA up by name */
125 gs_retval_t ftalookup_lookup_fta_index(FTAID caller,
126                                        FTAname name, gs_uint32_t  reuse, FTAID * ftaid,
127                                        gs_csp_t  * schema)
128 {
129     gs_int32_t x;
130 #ifdef PRINTMSG
131     fprintf(stderr,"Name %s reusable %u\n",name,reuse);
132 #endif
133     if (reuse==1) {
134         /* grep the firs reusable instance */
135         for(x=0;x<llookup;x++) {
136             if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
137                 && (flookup[x].reusable>=1)
138                 && (flookup[x].ftaid.streamid!=0)) {
139                 *ftaid=flookup[x].ftaid;
140                 *schema=flookup[x].schema;
141 #ifdef PRINTMSG
142                 fprintf(stderr,"\tREUSE FTA\n");
143 #endif
144                 return 0;
145             }
146         }
147     }
148     /* grep the first fta with the name not an instance */
149     for(x=0;x<llookup;x++) {
150         if ((flookup[x].used==1)&&(strcmp(flookup[x].name,name)==0)
151             && (flookup[x].ftaid.streamid==0)) {
152             *ftaid=flookup[x].ftaid;
153             *schema=flookup[x].schema;
154 #ifdef PRINTMSG
155             fprintf(stderr,"\tNEW FTA\n");
156 #endif
157
158             gslog(LOG_DEBUG,"Lookup of FTA %s with FTAID {ip=%u,port=%u,index=%u,streamid=%u}\n",name,ftaid->ip,ftaid->port,ftaid->index,ftaid->streamid);
159             return 0;
160         }
161     }
162 #ifdef PRINTMSG
163     fprintf(stderr,"NO MATCH\n");
164 #endif
165     return -1;
166 }
167
168 gs_retval_t ftalookup_producer_failure(FTAID caller,FTAID producer) {
169     return 0;
170 }
171
172 gs_retval_t ftalookup_heartbeat(FTAID caller_id, gs_uint64_t trace_id,
173                                 gs_uint32_t  sz, fta_stat * trace){
174
175         gs_uint32_t i = 0;
176     endpoint gshub;
177     if (get_hub(&gshub)!=0) {
178         gslog(LOG_EMERG,"ERROR:could not find gshub to announce fta");
179         return -1;
180     }
181
182     // to avoid sending redundant FTA instance stats to GSHUB we will only send statistics that have trace size of 1
183         // for application heartbeats (streamid=0) we will only send last stat in their traces
184     if ((sz == 1) || (trace[sz-1].ftaid.streamid == 0)) {
185         if (set_instancestats(gshub,get_instance_name(),&trace[sz-1])!=0) {
186                 gslog(LOG_EMERG,"ERROR:could not set instancestats");
187                 return -1;
188                 }
189
190     }
191
192     #ifdef PRINTMSG
193         gslog(LOG_DEBUG,"Heartbeat trace from FTA {ip=%u,port=%u,index=%u,streamid=%u}, trace_id=%llu ntrace=%d\n", caller_id.ip,caller_id.port,caller_id.index,caller_id.streamid, trace_id,sz);
194         for (i = 0; i < sz; ++i) {
195                 gslog(LOG_DEBUG,"trace_id=%llu, trace[%u].ftaid={ip=%u,port=%u,index=%u,streamid=%u}, fta_stat={in_tuple_cnt=%u,out_tuple_cnt=%u,out_tuple_sz=%u,accepted_tuple_cnt=%u,cycle_cnt=%llu,collision_cnt=%u,eviction_cnt=%u,sampling_rate=%f}\n", trace_id, i,
196               trace[i].ftaid.ip,trace[i].ftaid.port,trace[i].ftaid.index,trace[i].ftaid.streamid,
197               trace[i].in_tuple_cnt, trace[i].out_tuple_cnt, trace[i].out_tuple_sz, trace[i].accepted_tuple_cnt, trace[i].cycle_cnt, trace[i].collision_cnt, trace[i].eviction_cnt, trace[i].sampling_rate);
198         }
199     #endif
200     return 0;
201 }