1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <arpa/inet.h>
25 #include "simple_http.h"
28 // maximum length of URL for http requests
29 #define MAX_URL_LEN 64 * 1024
31 // maximu length of JSON response
32 #define MAX_JSON_LEN 64 * 1024
34 // sleep time between HTTP request retries
35 #define HTTP_RETRY_INTERVAL 5
40 // GSHUB instance_name
41 gs_sp_t instance_name=0;
44 // retrieve the endpoint of gs instance, source or sink identified by url
45 gs_retval_t get_service_endpoint (endpoint gshub, gs_csp_t url, endpoint* instance, gs_bool_t block) {
47 gs_uint32_t http_code;
48 char json[MAX_JSON_LEN];
51 res = http_get_request(gshub, url, &http_code, json);
53 fprintf(stderr, "http_get_request() failed\n");
57 // in blocking mode we will keep retrying
58 if (http_code == 400) {
60 sleep(HTTP_RETRY_INTERVAL);
68 if (http_code == 200 ) {
69 // if instance is NULL there is no need to parse json
75 // now parse json response
79 block_allocator allocator(1 << 10); // 1 KB per block
81 json_value *root = json_parse(json, &errorPos, (const char**)&errorDesc, &errorLine, &allocator);
83 fprintf(stderr, "GSHUB returned invalid json response, error description - %s, error line - %d\n", errorDesc, errorLine);
86 unsigned long json_ip = 0;
88 unsigned json_port = 0;
89 bool port_set = false;
90 for (json_value *it = root->first_child; it; it = it->next_sibling) {
92 if (!strcmp(it->name, "ip") && it->type == JSON_STRING) {
93 inet_pton(AF_INET, it->string_value, &json_ip);
95 } else if (!strcmp(it->name, "port") && it->type == JSON_INT) {
96 json_port = htons(it->int_value);
102 if (!ip_set || !port_set) {
103 fprintf(stderr, "GSHUB returned json response with missing ip or port fields");
106 instance->ip = json_ip;
107 instance->port = json_port;
121 // announce gs instance/sink/source to gshub. GSHUB is identified by URL, instance/sink/source information is in application/json
122 gs_retval_t set_endpoint_info (endpoint gshub, gs_csp_t url, char* info) {
124 gs_uint32_t http_code;
126 res = http_post_request(gshub, url, info, &http_code);
128 fprintf(stderr, "http_post_request() failed\n");
132 if (http_code == 200)
138 extern "C" gs_retval_t set_instance_name(gs_sp_t instancename){
139 instance_name=strdup(instancename);
144 extern "C" gs_sp_t get_instance_name() {
145 return instance_name;
148 // save gshub endpoint
149 extern "C" gs_retval_t set_hub(endpoint gshub) {
151 hub.port = gshub.port;
156 // retrieve gsbub endpoint
157 extern "C" gs_retval_t get_hub(endpoint* gshub) {
162 gshub->port = hub.port;
167 // Discover gs instance endpoint by name.
168 extern "C" gs_retval_t get_instance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {
169 char url[MAX_URL_LEN];
171 sprintf(url, "%s/%s", DISCOVER_INSTANCE_URL, instance_name);
173 return get_service_endpoint(gshub, url, instance, block);
176 // Discover initialized gs instance endpoint by name.
177 extern "C" gs_retval_t get_initinstance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {
178 char url[MAX_URL_LEN];
180 sprintf(url, "%s/%s", DISCOVER_INITINSTANCE_URL, instance_name);
182 return get_service_endpoint(gshub, url, instance, block);
185 // Discover stream source endpoint by name.
186 extern "C" gs_retval_t get_streamsource(endpoint gshub, gs_sp_t source_name, endpoint* source, gs_bool_t block) {
187 char url[MAX_URL_LEN];
189 sprintf(url, "%s/%s", DISCOVER_SOURCE_URL, source_name);
191 return get_service_endpoint(gshub, url, source, block);
194 // Discover stream sink endpoint by name.
195 extern "C" gs_retval_t get_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint* sink, gs_bool_t block) {
196 char url[MAX_URL_LEN];
198 sprintf(url, "%s/%s", DISCOVER_SINK_URL, sink_name);
200 return get_service_endpoint(gshub, url, sink, block);
203 // Discover if an isntance should start processing
204 gs_retval_t get_startprocessing(endpoint gshub, gs_sp_t instance_name, gs_bool_t block) {
205 char url[MAX_URL_LEN];
207 sprintf(url, "%s/%s", DISCOVER_STARTPROCESSING_URL, instance_name);
209 return get_service_endpoint(gshub, url, NULL, block);
212 // Announce gs instance endpoint to gshub
213 extern "C" gs_retval_t set_instance(endpoint gshub, gs_sp_t instance_name, endpoint instance) {
215 char info[MAX_JSON_LEN];
217 inet_ntop(AF_INET, &instance.ip, ipstr, INET_ADDRSTRLEN);
218 sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", instance_name, ipstr, ntohs(instance.port));
220 return set_endpoint_info(gshub, ANNOUNCE_INSTANCE_URL, info);
223 // Announce initialized gs instance endpoint to gshub
224 extern "C" gs_retval_t set_initinstance(endpoint gshub, gs_sp_t instance_name) {
225 char info[MAX_JSON_LEN];
227 sprintf(info, "{\"name\": \"%s\"}", instance_name);
229 return set_endpoint_info(gshub, ANNOUNCE_INITINSTANCE_URL, info);
232 // Announce stream source endpoint to gshub
233 extern "C" gs_retval_t set_streamsource(endpoint gshub, gs_sp_t source_name, endpoint source) {
235 char info[MAX_JSON_LEN];
237 inet_ntop(AF_INET, &source.ip, ipstr, INET_ADDRSTRLEN);
238 sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", source_name, ipstr, ntohs(source.port));
240 return set_endpoint_info(gshub, ANNOUNCE_SOURCE_URL, info);
243 // Announce stream source endpoint to gshub
244 extern "C" gs_retval_t set_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint sink) {
246 char info[MAX_JSON_LEN];
248 inet_ntop(AF_INET, &sink.ip, ipstr, INET_ADDRSTRLEN);
249 sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", sink_name, ipstr, ntohs(sink.port));
251 return set_endpoint_info(gshub, ANNOUNCE_SINK_URL, info);
254 // Announce to gshub that an instance can start processin
255 extern "C" gs_retval_t set_startprocessing(endpoint gshub, gs_sp_t instance_name) {
256 char info[MAX_JSON_LEN];
258 sprintf(info, "{\"name\": \"%s\"}", instance_name);
260 return set_endpoint_info(gshub, ANNOUNCE_STARTPROCESSING_URL, info);
263 // Announce stream subscriptino to gshub
264 extern "C" gs_retval_t set_streamsubscription(endpoint gshub, gs_sp_t instance_name, gs_sp_t sink_name) {
265 char info[MAX_JSON_LEN];
267 sprintf(info, "{\"name\": \"%s\", \"sink\": \"%s\"}", instance_name, sink_name);
269 return set_endpoint_info(gshub, ANNOUNCE_STREAM_SUBSCRIPTION, info);
272 // Announce new fta instantiation to gshub
273 extern "C" gs_retval_t set_ftainstance(endpoint gshub, gs_sp_t instance_name, gs_sp_t ftainstance_name, FTAID* id) {
274 char info[MAX_JSON_LEN];
276 sprintf(info, "{\"name\": \"%s\", \"fta_name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %lli}}",
277 instance_name, ftainstance_name, id->ip, id->port, id->index, id->streamid);
279 return set_endpoint_info(gshub, ANNOUNCE_FTA_INSTANCE, info);
282 // Announce fta instance stats to gshub
283 extern "C" gs_retval_t set_instancestats(endpoint gshub, gs_sp_t instance_name, fta_stat* stats) {
284 char url[MAX_URL_LEN];
285 char info[MAX_JSON_LEN];
287 sprintf(info, "{\"name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %llu}, "
288 "\"metrics\": {\"in_tuple_cnt\": %u, \"out_tuple_cnt\": %u, \"out_tuple_sz\": %u, \"accepted_tuple_cnt\": %u, \"cycle_cnt\": %llu, \"collision_cnt\": %u, \"eviction_cnt\": %u, \"sampling_rate\": %f}}",
289 instance_name, stats->ftaid.ip, stats->ftaid.port, stats->ftaid.index, stats->ftaid.streamid,
290 stats->in_tuple_cnt, stats->out_tuple_cnt, stats->out_tuple_sz, stats->accepted_tuple_cnt, stats->cycle_cnt, stats->collision_cnt, stats->eviction_cnt, stats->sampling_rate);
292 return set_endpoint_info(gshub, ANNOUNCE_METRICS, info);