-/* ------------------------------------------------\r
-Copyright 2014 AT&T Intellectual Property\r
- Licensed under the Apache License, Version 2.0 (the "License");\r
- you may not use this file except in compliance with the License.\r
- You may obtain a copy of the License at\r
-\r
- http://www.apache.org/licenses/LICENSE-2.0\r
-\r
- Unless required by applicable law or agreed to in writing, software\r
- distributed under the License is distributed on an "AS IS" BASIS,\r
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- See the License for the specific language governing permissions and\r
- limitations under the License.\r
- ------------------------------------------- */\r
-\r
-#include "gshub.h"\r
-#include "gslog.h"\r
-\r
-#include <stdio.h>\r
-#include <stdlib.h>\r
-#include <string.h>\r
-#include <arpa/inet.h>\r
-#include <unistd.h>\r
-\r
-#include "simple_http.h"\r
-#include "json.h"\r
-\r
-// maximum length of URL for http requests\r
-#define MAX_URL_LEN 64 * 1024\r
-\r
-// maximu length of JSON response\r
-#define MAX_JSON_LEN 64 * 1024\r
-\r
-// sleep time between HTTP request retries\r
-#define HTTP_RETRY_INTERVAL 5\r
-\r
-// GSHUB endpoint\r
-endpoint hub;\r
-\r
-// GSHUB instance_name\r
-gs_sp_t instance_name=0;\r
-\r
-\r
-// retrieve the endpoint of gs instance, source or sink identified by url\r
-gs_retval_t get_service_endpoint (endpoint gshub, gs_csp_t url, endpoint* instance, gs_bool_t block) {\r
- int res, ret;\r
- gs_uint32_t http_code;\r
- char json[MAX_JSON_LEN];\r
-\r
- while (true) {\r
- res = http_get_request(gshub, url, &http_code, json);\r
- if(res) {\r
- fprintf(stderr, "http_get_request() failed\n");\r
- ret = -1;\r
- }\r
- else {\r
- // in blocking mode we will keep retrying\r
- if (http_code == 400) {\r
- if (block) {\r
- sleep(HTTP_RETRY_INTERVAL);\r
- continue;\r
- } else {\r
- ret = 1;\r
- break;\r
- }\r
-\r
- }\r
- if (http_code == 200 ) {\r
- // if instance is NULL there is no need to parse json\r
- if (!instance) {\r
- ret = 0;\r
- break;\r
- }\r
-\r
- // now parse json response\r
- char *errorPos = 0;\r
- char *errorDesc = 0;\r
- int errorLine = 0;\r
- block_allocator allocator(1 << 10); // 1 KB per block\r
-\r
- json_value *root = json_parse(json, &errorPos, (const char**)&errorDesc, &errorLine, &allocator);\r
- if (!root) {\r
- fprintf(stderr, "GSHUB returned invalid json response, error description - %s, error line - %d\n", errorDesc, errorLine);\r
- ret = -1;\r
- } else {\r
- unsigned long json_ip = 0;\r
- bool ip_set = false;\r
- unsigned json_port = 0;\r
- bool port_set = false;\r
- for (json_value *it = root->first_child; it; it = it->next_sibling) {\r
- if (it->name) {\r
- if (!strcmp(it->name, "ip") && it->type == JSON_STRING) {\r
- inet_pton(AF_INET, it->string_value, &json_ip);\r
- ip_set = true;\r
- } else if (!strcmp(it->name, "port") && it->type == JSON_INT) {\r
- json_port = htons(it->int_value);\r
- port_set = true;\r
- }\r
- }\r
- }\r
-\r
- if (!ip_set || !port_set) {\r
- fprintf(stderr, "GSHUB returned json response with missing ip or port fields");\r
- ret = -1;\r
- }\r
- instance->ip = json_ip;\r
- instance->port = json_port;\r
-\r
- ret = 0;\r
- }\r
- } else\r
- ret = -1;\r
- }\r
- break;\r
- }\r
-\r
- return ret;\r
-\r
-}\r
-\r
-// announce gs instance/sink/source to gshub. GSHUB is identified by URL, instance/sink/source information is in application/json\r
-gs_retval_t set_endpoint_info (endpoint gshub, gs_csp_t url, char* info) {\r
- int res;\r
- gs_uint32_t http_code;\r
-\r
- res = http_post_request(gshub, url, info, &http_code);\r
- if (res) {\r
- fprintf(stderr, "http_post_request() failed\n");\r
- return -1;\r
- }\r
-\r
- if (http_code == 200)\r
- return 0;\r
- else\r
- return -1;\r
-}\r
-\r
-extern "C" gs_retval_t set_instance_name(gs_sp_t instancename){\r
- instance_name=strdup(instancename);\r
-\r
- return 0;\r
-}\r
-\r
-extern "C" gs_sp_t get_instance_name() {\r
- return instance_name;\r
-}\r
-\r
-// save gshub endpoint\r
-extern "C" gs_retval_t set_hub(endpoint gshub) {\r
- hub.ip = gshub.ip;\r
- hub.port = gshub.port;\r
-\r
- return 0;\r
-}\r
-\r
-// retrieve gsbub endpoint\r
-extern "C" gs_retval_t get_hub(endpoint* gshub) {\r
-\r
- if (hub.ip==0)\r
- return -1;\r
- gshub->ip = hub.ip;\r
- gshub->port = hub.port;\r
-\r
- return 0;\r
-}\r
-\r
-// Discover gs instance endpoint by name.\r
-extern "C" gs_retval_t get_instance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {\r
- char url[MAX_URL_LEN];\r
-\r
- sprintf(url, "%s/%s", DISCOVER_INSTANCE_URL, instance_name);\r
-\r
- return get_service_endpoint(gshub, url, instance, block);\r
-}\r
-\r
-// Discover initialized gs instance endpoint by name.\r
-extern "C" gs_retval_t get_initinstance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {\r
- char url[MAX_URL_LEN];\r
-\r
- sprintf(url, "%s/%s", DISCOVER_INITINSTANCE_URL, instance_name);\r
-\r
- return get_service_endpoint(gshub, url, instance, block);\r
-}\r
-\r
-// Discover stream source endpoint by name.\r
-extern "C" gs_retval_t get_streamsource(endpoint gshub, gs_sp_t source_name, endpoint* source, gs_bool_t block) {\r
- char url[MAX_URL_LEN];\r
-\r
- sprintf(url, "%s/%s", DISCOVER_SOURCE_URL, source_name);\r
-\r
- return get_service_endpoint(gshub, url, source, block);\r
-}\r
-\r
-// Discover stream sink endpoint by name.\r
-extern "C" gs_retval_t get_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint* sink, gs_bool_t block) {\r
- char url[MAX_URL_LEN];\r
-\r
- sprintf(url, "%s/%s", DISCOVER_SINK_URL, sink_name);\r
-\r
- return get_service_endpoint(gshub, url, sink, block);\r
-}\r
-\r
-// Discover if an isntance should start processing\r
-gs_retval_t get_startprocessing(endpoint gshub, gs_sp_t instance_name, gs_bool_t block) {\r
- char url[MAX_URL_LEN];\r
-\r
- sprintf(url, "%s/%s", DISCOVER_STARTPROCESSING_URL, instance_name);\r
-\r
- return get_service_endpoint(gshub, url, NULL, block);\r
-}\r
-\r
-// Announce gs instance endpoint to gshub\r
-extern "C" gs_retval_t set_instance(endpoint gshub, gs_sp_t instance_name, endpoint instance) {\r
- char ipstr[16];\r
- char info[MAX_JSON_LEN];\r
-\r
- inet_ntop(AF_INET, &instance.ip, ipstr, INET_ADDRSTRLEN);\r
- sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", instance_name, ipstr, ntohs(instance.port));\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_INSTANCE_URL, info);\r
-}\r
-\r
-// Announce initialized gs instance endpoint to gshub\r
-extern "C" gs_retval_t set_initinstance(endpoint gshub, gs_sp_t instance_name) {\r
- char info[MAX_JSON_LEN];\r
-\r
- sprintf(info, "{\"name\": \"%s\"}", instance_name);\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_INITINSTANCE_URL, info);\r
-}\r
-\r
-// Announce stream source endpoint to gshub\r
-extern "C" gs_retval_t set_streamsource(endpoint gshub, gs_sp_t source_name, endpoint source) {\r
- char ipstr[16];\r
- char info[MAX_JSON_LEN];\r
-\r
- inet_ntop(AF_INET, &source.ip, ipstr, INET_ADDRSTRLEN);\r
- sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", source_name, ipstr, ntohs(source.port));\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_SOURCE_URL, info);\r
-}\r
-\r
-// Announce stream source endpoint to gshub\r
-extern "C" gs_retval_t set_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint sink) {\r
- char ipstr[16];\r
- char info[MAX_JSON_LEN];\r
-\r
- inet_ntop(AF_INET, &sink.ip, ipstr, INET_ADDRSTRLEN);\r
- sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", sink_name, ipstr, ntohs(sink.port));\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_SINK_URL, info);\r
-}\r
-\r
-// Announce to gshub that an instance can start processin\r
-extern "C" gs_retval_t set_startprocessing(endpoint gshub, gs_sp_t instance_name) {\r
- char info[MAX_JSON_LEN];\r
-\r
- sprintf(info, "{\"name\": \"%s\"}", instance_name);\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_STARTPROCESSING_URL, info);\r
-}\r
-\r
-// Announce stream subscriptino to gshub\r
-extern "C" gs_retval_t set_streamsubscription(endpoint gshub, gs_sp_t instance_name, gs_sp_t sink_name) {\r
- char info[MAX_JSON_LEN];\r
-\r
- sprintf(info, "{\"name\": \"%s\", \"sink\": \"%s\"}", instance_name, sink_name);\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_STREAM_SUBSCRIPTION, info);\r
-}\r
-\r
-// Announce new fta instantiation to gshub\r
-extern "C" gs_retval_t set_ftainstance(endpoint gshub, gs_sp_t instance_name, gs_sp_t ftainstance_name, FTAID* id) {\r
- char info[MAX_JSON_LEN];\r
-\r
- sprintf(info, "{\"name\": \"%s\", \"fta_name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %lli}}",\r
- instance_name, ftainstance_name, id->ip, id->port, id->index, id->streamid);\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_FTA_INSTANCE, info);\r
-}\r
-\r
-// Announce fta instance stats to gshub\r
-extern "C" gs_retval_t set_instancestats(endpoint gshub, gs_sp_t instance_name, fta_stat* stats) {\r
- char url[MAX_URL_LEN];\r
- char info[MAX_JSON_LEN];\r
-\r
- sprintf(info, "{\"name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %llu}, "\r
- "\"metrics\": {\"in_tuple_cnt\": %u, \"out_tuple_cnt\": %u, \"out_tuple_sz\": %u, \"accepted_tuple_cnt\": %u, \"cycle_cnt\": %llu, \"collision_cnt\": %u, \"eviction_cnt\": %u, \"sampling_rate\": %f}}",\r
- instance_name, stats->ftaid.ip, stats->ftaid.port, stats->ftaid.index, stats->ftaid.streamid,\r
- stats->in_tuple_cnt, stats->out_tuple_cnt, stats->out_tuple_sz, stats->accepted_tuple_cnt, stats->cycle_cnt, stats->collision_cnt, stats->eviction_cnt, stats->sampling_rate);\r
-\r
- return set_endpoint_info(gshub, ANNOUNCE_METRICS, info);\r
-}\r
-\r
+/* ------------------------------------------------
+Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+#include "gshub.h"
+#include "gslog.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+
+#include "simple_http.h"
+#include "json.h"
+
+// maximum length of URL for http requests
+#define MAX_URL_LEN 64 * 1024
+
+// maximu length of JSON response
+#define MAX_JSON_LEN 64 * 1024
+
+// sleep time between HTTP request retries
+#define HTTP_RETRY_INTERVAL 5
+
+// GSHUB endpoint
+endpoint hub;
+
+// GSHUB instance_name
+gs_sp_t instance_name=0;
+
+
+// retrieve the endpoint of gs instance, source or sink identified by url
+gs_retval_t get_service_endpoint (endpoint gshub, gs_csp_t url, endpoint* instance, gs_bool_t block) {
+ int res, ret;
+ gs_uint32_t http_code;
+ char json[MAX_JSON_LEN];
+
+ while (true) {
+ res = http_get_request(gshub, url, &http_code, json);
+ if(res) {
+ fprintf(stderr, "http_get_request() failed\n");
+ ret = -1;
+ }
+ else {
+ // in blocking mode we will keep retrying
+ if (http_code == 400) {
+ if (block) {
+ sleep(HTTP_RETRY_INTERVAL);
+ continue;
+ } else {
+ ret = 1;
+ break;
+ }
+
+ }
+ if (http_code == 200 ) {
+ // if instance is NULL there is no need to parse json
+ if (!instance) {
+ ret = 0;
+ break;
+ }
+
+ // now parse json response
+ char *errorPos = 0;
+ char *errorDesc = 0;
+ int errorLine = 0;
+ block_allocator allocator(1 << 10); // 1 KB per block
+
+ json_value *root = json_parse(json, &errorPos, (const char**)&errorDesc, &errorLine, &allocator);
+ if (!root) {
+ fprintf(stderr, "GSHUB returned invalid json response, error description - %s, error line - %d\n", errorDesc, errorLine);
+ ret = -1;
+ } else {
+ unsigned long json_ip = 0;
+ bool ip_set = false;
+ unsigned json_port = 0;
+ bool port_set = false;
+ for (json_value *it = root->first_child; it; it = it->next_sibling) {
+ if (it->name) {
+ if (!strcmp(it->name, "ip") && it->type == JSON_STRING) {
+ inet_pton(AF_INET, it->string_value, &json_ip);
+ ip_set = true;
+ } else if (!strcmp(it->name, "port") && it->type == JSON_INT) {
+ json_port = htons(it->int_value);
+ port_set = true;
+ }
+ }
+ }
+
+ if (!ip_set || !port_set) {
+ fprintf(stderr, "GSHUB returned json response with missing ip or port fields");
+ ret = -1;
+ }
+ instance->ip = json_ip;
+ instance->port = json_port;
+
+ ret = 0;
+ }
+ } else
+ ret = -1;
+ }
+ break;
+ }
+
+ return ret;
+
+}
+
+// announce gs instance/sink/source to gshub. GSHUB is identified by URL, instance/sink/source information is in application/json
+gs_retval_t set_endpoint_info (endpoint gshub, gs_csp_t url, char* info) {
+ int res;
+ gs_uint32_t http_code;
+
+ res = http_post_request(gshub, url, info, &http_code);
+ if (res) {
+ fprintf(stderr, "http_post_request() failed\n");
+ return -1;
+ }
+
+ if (http_code == 200)
+ return 0;
+ else
+ return -1;
+}
+
+extern "C" gs_retval_t set_instance_name(gs_sp_t instancename){
+ instance_name=strdup(instancename);
+
+ return 0;
+}
+
+extern "C" gs_sp_t get_instance_name() {
+ return instance_name;
+}
+
+// save gshub endpoint
+extern "C" gs_retval_t set_hub(endpoint gshub) {
+ hub.ip = gshub.ip;
+ hub.port = gshub.port;
+
+ return 0;
+}
+
+// retrieve gsbub endpoint
+extern "C" gs_retval_t get_hub(endpoint* gshub) {
+
+ if (hub.ip==0)
+ return -1;
+ gshub->ip = hub.ip;
+ gshub->port = hub.port;
+
+ return 0;
+}
+
+// Discover gs instance endpoint by name.
+extern "C" gs_retval_t get_instance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {
+ char url[MAX_URL_LEN];
+
+ sprintf(url, "%s/%s", DISCOVER_INSTANCE_URL, instance_name);
+
+ return get_service_endpoint(gshub, url, instance, block);
+}
+
+// Discover initialized gs instance endpoint by name.
+extern "C" gs_retval_t get_initinstance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {
+ char url[MAX_URL_LEN];
+
+ sprintf(url, "%s/%s", DISCOVER_INITINSTANCE_URL, instance_name);
+
+ return get_service_endpoint(gshub, url, instance, block);
+}
+
+// Discover stream source endpoint by name.
+extern "C" gs_retval_t get_streamsource(endpoint gshub, gs_sp_t source_name, endpoint* source, gs_bool_t block) {
+ char url[MAX_URL_LEN];
+
+ sprintf(url, "%s/%s", DISCOVER_SOURCE_URL, source_name);
+
+ return get_service_endpoint(gshub, url, source, block);
+}
+
+// Discover stream sink endpoint by name.
+extern "C" gs_retval_t get_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint* sink, gs_bool_t block) {
+ char url[MAX_URL_LEN];
+
+ sprintf(url, "%s/%s", DISCOVER_SINK_URL, sink_name);
+
+ return get_service_endpoint(gshub, url, sink, block);
+}
+
+// Discover if an isntance should start processing
+gs_retval_t get_startprocessing(endpoint gshub, gs_sp_t instance_name, gs_bool_t block) {
+ char url[MAX_URL_LEN];
+
+ sprintf(url, "%s/%s", DISCOVER_STARTPROCESSING_URL, instance_name);
+
+ return get_service_endpoint(gshub, url, NULL, block);
+}
+
+// Announce gs instance endpoint to gshub
+extern "C" gs_retval_t set_instance(endpoint gshub, gs_sp_t instance_name, endpoint instance) {
+ char ipstr[16];
+ char info[MAX_JSON_LEN];
+
+ inet_ntop(AF_INET, &instance.ip, ipstr, INET_ADDRSTRLEN);
+ sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", instance_name, ipstr, ntohs(instance.port));
+
+ return set_endpoint_info(gshub, ANNOUNCE_INSTANCE_URL, info);
+}
+
+// Announce initialized gs instance endpoint to gshub
+extern "C" gs_retval_t set_initinstance(endpoint gshub, gs_sp_t instance_name) {
+ char info[MAX_JSON_LEN];
+
+ sprintf(info, "{\"name\": \"%s\"}", instance_name);
+
+ return set_endpoint_info(gshub, ANNOUNCE_INITINSTANCE_URL, info);
+}
+
+// Announce stream source endpoint to gshub
+extern "C" gs_retval_t set_streamsource(endpoint gshub, gs_sp_t source_name, endpoint source) {
+ char ipstr[16];
+ char info[MAX_JSON_LEN];
+
+ inet_ntop(AF_INET, &source.ip, ipstr, INET_ADDRSTRLEN);
+ sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", source_name, ipstr, ntohs(source.port));
+
+ return set_endpoint_info(gshub, ANNOUNCE_SOURCE_URL, info);
+}
+
+// Announce stream source endpoint to gshub
+extern "C" gs_retval_t set_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint sink) {
+ char ipstr[16];
+ char info[MAX_JSON_LEN];
+
+ inet_ntop(AF_INET, &sink.ip, ipstr, INET_ADDRSTRLEN);
+ sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", sink_name, ipstr, ntohs(sink.port));
+
+ return set_endpoint_info(gshub, ANNOUNCE_SINK_URL, info);
+}
+
+// Announce to gshub that an instance can start processin
+extern "C" gs_retval_t set_startprocessing(endpoint gshub, gs_sp_t instance_name) {
+ char info[MAX_JSON_LEN];
+
+ sprintf(info, "{\"name\": \"%s\"}", instance_name);
+
+ return set_endpoint_info(gshub, ANNOUNCE_STARTPROCESSING_URL, info);
+}
+
+// Announce stream subscriptino to gshub
+extern "C" gs_retval_t set_streamsubscription(endpoint gshub, gs_sp_t instance_name, gs_sp_t sink_name) {
+ char info[MAX_JSON_LEN];
+
+ sprintf(info, "{\"name\": \"%s\", \"sink\": \"%s\"}", instance_name, sink_name);
+
+ return set_endpoint_info(gshub, ANNOUNCE_STREAM_SUBSCRIPTION, info);
+}
+
+// Announce new fta instantiation to gshub
+extern "C" gs_retval_t set_ftainstance(endpoint gshub, gs_sp_t instance_name, gs_sp_t ftainstance_name, FTAID* id) {
+ char info[MAX_JSON_LEN];
+
+ sprintf(info, "{\"name\": \"%s\", \"fta_name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %lli}}",
+ instance_name, ftainstance_name, id->ip, id->port, id->index, id->streamid);
+
+ return set_endpoint_info(gshub, ANNOUNCE_FTA_INSTANCE, info);
+}
+
+// Announce fta instance stats to gshub
+extern "C" gs_retval_t set_instancestats(endpoint gshub, gs_sp_t instance_name, fta_stat* stats) {
+ char url[MAX_URL_LEN];
+ char info[MAX_JSON_LEN];
+
+ sprintf(info, "{\"name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %llu}, "
+ "\"metrics\": {\"in_tuple_cnt\": %u, \"out_tuple_cnt\": %u, \"out_tuple_sz\": %u, \"accepted_tuple_cnt\": %u, \"cycle_cnt\": %llu, \"collision_cnt\": %u, \"eviction_cnt\": %u, \"sampling_rate\": %f}}",
+ instance_name, stats->ftaid.ip, stats->ftaid.port, stats->ftaid.index, stats->ftaid.streamid,
+ stats->in_tuple_cnt, stats->out_tuple_cnt, stats->out_tuple_sz, stats->accepted_tuple_cnt, stats->cycle_cnt, stats->collision_cnt, stats->eviction_cnt, stats->sampling_rate);
+
+ return set_endpoint_info(gshub, ANNOUNCE_METRICS, info);
+}
+