Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscpaux / gshub.cpp
1 /* ------------------------------------------------\r
2 Copyright 2014 AT&T Intellectual Property\r
3    Licensed under the Apache License, Version 2.0 (the "License");\r
4    you may not use this file except in compliance with the License.\r
5    You may obtain a copy of the License at\r
6 \r
7      http://www.apache.org/licenses/LICENSE-2.0\r
8 \r
9    Unless required by applicable law or agreed to in writing, software\r
10    distributed under the License is distributed on an "AS IS" BASIS,\r
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12    See the License for the specific language governing permissions and\r
13    limitations under the License.\r
14  ------------------------------------------- */\r
15 \r
16 #include "gshub.h"\r
17 #include "gslog.h"\r
18 \r
19 #include <stdio.h>\r
20 #include <stdlib.h>\r
21 #include <string.h>\r
22 #include <arpa/inet.h>\r
23 #include <unistd.h>\r
24 \r
25 #include "simple_http.h"\r
26 #include "json.h"\r
27 \r
28 // maximum length of URL for http requests\r
29 #define MAX_URL_LEN 64 * 1024\r
30 \r
31 // maximu length of JSON response\r
32 #define MAX_JSON_LEN 64 * 1024\r
33 \r
34 // sleep time between HTTP request retries\r
35 #define HTTP_RETRY_INTERVAL 5\r
36 \r
37 // GSHUB endpoint\r
38 endpoint hub;\r
39 \r
40 // GSHUB instance_name\r
41 gs_sp_t instance_name=0;\r
42 \r
43 \r
44 // retrieve the endpoint of gs instance, source or sink identified by url\r
45 gs_retval_t get_service_endpoint (endpoint gshub, gs_csp_t url, endpoint* instance, gs_bool_t block) {\r
46         int res, ret;\r
47         gs_uint32_t http_code;\r
48         char json[MAX_JSON_LEN];\r
49 \r
50         while (true) {\r
51                 res = http_get_request(gshub, url, &http_code, json);\r
52                 if(res) {\r
53                         fprintf(stderr, "http_get_request() failed\n");\r
54                         ret = -1;\r
55                 }\r
56                 else {\r
57                         // in blocking mode we will keep retrying\r
58                         if (http_code == 400) {\r
59                                 if (block) {\r
60                                         sleep(HTTP_RETRY_INTERVAL);\r
61                                         continue;\r
62                                 } else {\r
63                                         ret = 1;\r
64                                         break;\r
65                                 }\r
66 \r
67                         }\r
68                         if (http_code == 200 ) {\r
69                                 // if instance is NULL there is no need to parse json\r
70                                 if (!instance) {\r
71                                         ret = 0;\r
72                                         break;\r
73                                 }\r
74 \r
75                                 // now parse json response\r
76                                 char *errorPos = 0;\r
77                                 char *errorDesc = 0;\r
78                                 int errorLine = 0;\r
79                                 block_allocator allocator(1 << 10); // 1 KB per block\r
80 \r
81                                 json_value *root = json_parse(json, &errorPos, (const char**)&errorDesc, &errorLine, &allocator);\r
82                                 if (!root) {\r
83                                         fprintf(stderr, "GSHUB returned invalid json response, error description - %s, error line - %d\n", errorDesc, errorLine);\r
84                                         ret = -1;\r
85                                 } else {\r
86                                         unsigned long json_ip = 0;\r
87                                         bool ip_set = false;\r
88                                         unsigned json_port = 0;\r
89                                         bool port_set = false;\r
90                                         for (json_value *it = root->first_child; it; it = it->next_sibling) {\r
91                                                 if (it->name) {\r
92                                                         if (!strcmp(it->name, "ip") && it->type == JSON_STRING) {\r
93                                                                 inet_pton(AF_INET, it->string_value, &json_ip);\r
94                                                                 ip_set = true;\r
95                                                         } else if (!strcmp(it->name, "port") && it->type == JSON_INT) {\r
96                                                                 json_port = htons(it->int_value);\r
97                                                                 port_set = true;\r
98                                                         }\r
99                                                 }\r
100                                         }\r
101 \r
102                                         if (!ip_set || !port_set) {\r
103                                                 fprintf(stderr, "GSHUB returned json response with missing ip or port fields");\r
104                                                 ret = -1;\r
105                                         }\r
106                                         instance->ip = json_ip;\r
107                                         instance->port = json_port;\r
108 \r
109                                         ret = 0;\r
110                                 }\r
111                         } else\r
112                                 ret = -1;\r
113                 }\r
114                 break;\r
115         }\r
116 \r
117         return ret;\r
118 \r
119 }\r
120 \r
121 // announce gs instance/sink/source to gshub. GSHUB is identified by URL, instance/sink/source information is in application/json\r
122 gs_retval_t set_endpoint_info (endpoint gshub, gs_csp_t url, char* info) {\r
123         int res;\r
124         gs_uint32_t http_code;\r
125 \r
126         res = http_post_request(gshub, url, info, &http_code);\r
127         if (res) {\r
128                 fprintf(stderr, "http_post_request() failed\n");\r
129                 return -1;\r
130         }\r
131 \r
132         if (http_code == 200)\r
133                 return 0;\r
134         else\r
135                 return -1;\r
136 }\r
137 \r
138 extern "C" gs_retval_t set_instance_name(gs_sp_t instancename){\r
139         instance_name=strdup(instancename);\r
140 \r
141         return 0;\r
142 }\r
143 \r
144 extern "C" gs_sp_t get_instance_name() {\r
145         return instance_name;\r
146 }\r
147 \r
148 // save gshub endpoint\r
149 extern "C" gs_retval_t set_hub(endpoint gshub) {\r
150         hub.ip = gshub.ip;\r
151         hub.port = gshub.port;\r
152 \r
153         return 0;\r
154 }\r
155 \r
156 // retrieve gsbub endpoint\r
157 extern "C" gs_retval_t get_hub(endpoint* gshub) {\r
158 \r
159         if (hub.ip==0)\r
160                 return -1;\r
161         gshub->ip = hub.ip;\r
162         gshub->port = hub.port;\r
163 \r
164         return 0;\r
165 }\r
166 \r
167 // Discover gs instance endpoint by name.\r
168 extern "C" gs_retval_t get_instance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {\r
169         char url[MAX_URL_LEN];\r
170 \r
171         sprintf(url, "%s/%s", DISCOVER_INSTANCE_URL, instance_name);\r
172 \r
173         return get_service_endpoint(gshub, url, instance, block);\r
174 }\r
175 \r
176 // Discover initialized gs instance endpoint by name.\r
177 extern "C" gs_retval_t get_initinstance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {\r
178         char url[MAX_URL_LEN];\r
179 \r
180         sprintf(url, "%s/%s", DISCOVER_INITINSTANCE_URL, instance_name);\r
181 \r
182         return get_service_endpoint(gshub, url, instance, block);\r
183 }\r
184 \r
185 // Discover stream source endpoint by name.\r
186 extern "C" gs_retval_t get_streamsource(endpoint gshub, gs_sp_t source_name, endpoint* source, gs_bool_t block) {\r
187         char url[MAX_URL_LEN];\r
188 \r
189         sprintf(url, "%s/%s", DISCOVER_SOURCE_URL, source_name);\r
190 \r
191         return get_service_endpoint(gshub, url, source, block);\r
192 }\r
193 \r
194 // Discover stream sink endpoint by name.\r
195 extern "C" gs_retval_t get_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint* sink, gs_bool_t block) {\r
196         char url[MAX_URL_LEN];\r
197 \r
198         sprintf(url, "%s/%s", DISCOVER_SINK_URL, sink_name);\r
199 \r
200         return get_service_endpoint(gshub, url, sink, block);\r
201 }\r
202 \r
203 // Discover if an isntance should start processing\r
204 gs_retval_t get_startprocessing(endpoint gshub, gs_sp_t instance_name, gs_bool_t block) {\r
205         char url[MAX_URL_LEN];\r
206 \r
207         sprintf(url, "%s/%s", DISCOVER_STARTPROCESSING_URL, instance_name);\r
208 \r
209         return get_service_endpoint(gshub, url, NULL, block);\r
210 }\r
211 \r
212 // Announce gs instance endpoint to gshub\r
213 extern "C" gs_retval_t set_instance(endpoint gshub, gs_sp_t instance_name, endpoint instance) {\r
214         char ipstr[16];\r
215         char info[MAX_JSON_LEN];\r
216 \r
217         inet_ntop(AF_INET, &instance.ip, ipstr, INET_ADDRSTRLEN);\r
218         sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", instance_name, ipstr, ntohs(instance.port));\r
219 \r
220         return set_endpoint_info(gshub, ANNOUNCE_INSTANCE_URL, info);\r
221 }\r
222 \r
223 // Announce initialized gs instance endpoint to gshub\r
224 extern "C" gs_retval_t set_initinstance(endpoint gshub, gs_sp_t instance_name) {\r
225         char info[MAX_JSON_LEN];\r
226 \r
227         sprintf(info, "{\"name\": \"%s\"}", instance_name);\r
228 \r
229         return set_endpoint_info(gshub, ANNOUNCE_INITINSTANCE_URL, info);\r
230 }\r
231 \r
232 // Announce stream source endpoint to gshub\r
233 extern "C" gs_retval_t set_streamsource(endpoint gshub, gs_sp_t source_name, endpoint source) {\r
234         char ipstr[16];\r
235         char info[MAX_JSON_LEN];\r
236 \r
237         inet_ntop(AF_INET, &source.ip, ipstr, INET_ADDRSTRLEN);\r
238         sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", source_name, ipstr, ntohs(source.port));\r
239 \r
240         return set_endpoint_info(gshub, ANNOUNCE_SOURCE_URL, info);\r
241 }\r
242 \r
243 // Announce stream source endpoint to gshub\r
244 extern "C" gs_retval_t set_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint sink) {\r
245         char ipstr[16];\r
246         char info[MAX_JSON_LEN];\r
247 \r
248         inet_ntop(AF_INET, &sink.ip, ipstr, INET_ADDRSTRLEN);\r
249         sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", sink_name, ipstr, ntohs(sink.port));\r
250 \r
251         return set_endpoint_info(gshub, ANNOUNCE_SINK_URL, info);\r
252 }\r
253 \r
254 // Announce to gshub that an instance can start processin\r
255 extern "C" gs_retval_t set_startprocessing(endpoint gshub, gs_sp_t instance_name) {\r
256         char info[MAX_JSON_LEN];\r
257 \r
258         sprintf(info, "{\"name\": \"%s\"}", instance_name);\r
259 \r
260         return set_endpoint_info(gshub, ANNOUNCE_STARTPROCESSING_URL, info);\r
261 }\r
262 \r
263 // Announce stream subscriptino to gshub\r
264 extern "C" gs_retval_t set_streamsubscription(endpoint gshub, gs_sp_t instance_name, gs_sp_t sink_name) {\r
265         char info[MAX_JSON_LEN];\r
266 \r
267         sprintf(info, "{\"name\": \"%s\", \"sink\": \"%s\"}", instance_name, sink_name);\r
268 \r
269         return set_endpoint_info(gshub, ANNOUNCE_STREAM_SUBSCRIPTION, info);\r
270 }\r
271 \r
272 // Announce new fta instantiation to gshub\r
273 extern "C" gs_retval_t set_ftainstance(endpoint gshub, gs_sp_t instance_name, gs_sp_t ftainstance_name, FTAID* id) {\r
274         char info[MAX_JSON_LEN];\r
275 \r
276         sprintf(info, "{\"name\": \"%s\", \"fta_name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %lli}}",\r
277                 instance_name, ftainstance_name, id->ip, id->port, id->index, id->streamid);\r
278 \r
279         return set_endpoint_info(gshub, ANNOUNCE_FTA_INSTANCE, info);\r
280 }\r
281 \r
282 // Announce fta instance stats to gshub\r
283 extern "C" gs_retval_t set_instancestats(endpoint gshub, gs_sp_t instance_name, fta_stat* stats) {\r
284         char url[MAX_URL_LEN];\r
285         char info[MAX_JSON_LEN];\r
286 \r
287         sprintf(info, "{\"name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %llu}, "\r
288                 "\"metrics\": {\"in_tuple_cnt\": %u, \"out_tuple_cnt\": %u, \"out_tuple_sz\": %u, \"accepted_tuple_cnt\": %u, \"cycle_cnt\": %llu, \"collision_cnt\": %u, \"eviction_cnt\": %u, \"sampling_rate\": %f}}",\r
289                 instance_name, stats->ftaid.ip, stats->ftaid.port, stats->ftaid.index, stats->ftaid.streamid,\r
290                 stats->in_tuple_cnt, stats->out_tuple_cnt, stats->out_tuple_sz, stats->accepted_tuple_cnt, stats->cycle_cnt, stats->collision_cnt, stats->eviction_cnt, stats->sampling_rate);\r
291 \r
292         return set_endpoint_info(gshub, ANNOUNCE_METRICS, info);\r
293 }\r
294 \r