Initial commit
[com/gs-lite.git] / src / lib / gscpaux / gshub.cpp
1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3    Licensed under the Apache License, Version 2.0 (the "License");
4    you may not use this file except in compliance with the License.
5    You may obtain a copy of the License at
6
7      http://www.apache.org/licenses/LICENSE-2.0
8
9    Unless required by applicable law or agreed to in writing, software
10    distributed under the License is distributed on an "AS IS" BASIS,
11    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12    See the License for the specific language governing permissions and
13    limitations under the License.
14  ------------------------------------------- */
15
16 #include "gshub.h"
17 #include "gslog.h"
18
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <arpa/inet.h>
23 #include <unistd.h>
24
25 #include "simple_http.h"
26 #include "json.h"
27
28 // maximum length of URL for http requests
29 #define MAX_URL_LEN 64 * 1024
30
31 // maximu length of JSON response
32 #define MAX_JSON_LEN 64 * 1024
33
34 // sleep time between HTTP request retries
35 #define HTTP_RETRY_INTERVAL 5
36
37 // GSHUB endpoint
38 endpoint hub;
39
40 // GSHUB instance_name
41 gs_sp_t instance_name=0;
42
43
44 // retrieve the endpoint of gs instance, source or sink identified by url
45 gs_retval_t get_service_endpoint (endpoint gshub, gs_csp_t url, endpoint* instance, gs_bool_t block) {
46         int res, ret;
47         gs_uint32_t http_code;
48         char json[MAX_JSON_LEN];
49
50         while (true) {
51                 res = http_get_request(gshub, url, &http_code, json);
52                 if(res) {
53                         fprintf(stderr, "http_get_request() failed\n");
54                         ret = -1;
55                 }
56                 else {
57                         // in blocking mode we will keep retrying
58                         if (http_code == 400) {
59                                 if (block) {
60                                         sleep(HTTP_RETRY_INTERVAL);
61                                         continue;
62                                 } else {
63                                         ret = 1;
64                                         break;
65                                 }
66
67                         }
68                         if (http_code == 200 ) {
69                                 // if instance is NULL there is no need to parse json
70                                 if (!instance) {
71                                         ret = 0;
72                                         break;
73                                 }
74
75                                 // now parse json response
76                                 char *errorPos = 0;
77                                 char *errorDesc = 0;
78                                 int errorLine = 0;
79                                 block_allocator allocator(1 << 10); // 1 KB per block
80
81                                 json_value *root = json_parse(json, &errorPos, (const char**)&errorDesc, &errorLine, &allocator);
82                                 if (!root) {
83                                         fprintf(stderr, "GSHUB returned invalid json response, error description - %s, error line - %d\n", errorDesc, errorLine);
84                                         ret = -1;
85                                 } else {
86                                         unsigned long json_ip = 0;
87                                         bool ip_set = false;
88                                         unsigned json_port = 0;
89                                         bool port_set = false;
90                                         for (json_value *it = root->first_child; it; it = it->next_sibling) {
91                                                 if (it->name) {
92                                                         if (!strcmp(it->name, "ip") && it->type == JSON_STRING) {
93                                                                 inet_pton(AF_INET, it->string_value, &json_ip);
94                                                                 ip_set = true;
95                                                         } else if (!strcmp(it->name, "port") && it->type == JSON_INT) {
96                                                                 json_port = htons(it->int_value);
97                                                                 port_set = true;
98                                                         }
99                                                 }
100                                         }
101
102                                         if (!ip_set || !port_set) {
103                                                 fprintf(stderr, "GSHUB returned json response with missing ip or port fields");
104                                                 ret = -1;
105                                         }
106                                         instance->ip = json_ip;
107                                         instance->port = json_port;
108
109                                         ret = 0;
110                                 }
111                         } else
112                                 ret = -1;
113                 }
114                 break;
115         }
116
117         return ret;
118
119 }
120
121 // announce gs instance/sink/source to gshub. GSHUB is identified by URL, instance/sink/source information is in application/json
122 gs_retval_t set_endpoint_info (endpoint gshub, gs_csp_t url, char* info) {
123         int res;
124         gs_uint32_t http_code;
125
126         res = http_post_request(gshub, url, info, &http_code);
127         if (res) {
128                 fprintf(stderr, "http_post_request() failed\n");
129                 return -1;
130         }
131
132         if (http_code == 200)
133                 return 0;
134         else
135                 return -1;
136 }
137
138 extern "C" gs_retval_t set_instance_name(gs_sp_t instancename){
139         instance_name=strdup(instancename);
140
141         return 0;
142 }
143
144 extern "C" gs_sp_t get_instance_name() {
145         return instance_name;
146 }
147
148 // save gshub endpoint
149 extern "C" gs_retval_t set_hub(endpoint gshub) {
150         hub.ip = gshub.ip;
151         hub.port = gshub.port;
152
153         return 0;
154 }
155
156 // retrieve gsbub endpoint
157 extern "C" gs_retval_t get_hub(endpoint* gshub) {
158
159         if (hub.ip==0)
160                 return -1;
161         gshub->ip = hub.ip;
162         gshub->port = hub.port;
163
164         return 0;
165 }
166
167 // Discover gs instance endpoint by name.
168 extern "C" gs_retval_t get_instance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {
169         char url[MAX_URL_LEN];
170
171         sprintf(url, "%s/%s", DISCOVER_INSTANCE_URL, instance_name);
172
173         return get_service_endpoint(gshub, url, instance, block);
174 }
175
176 // Discover initialized gs instance endpoint by name.
177 extern "C" gs_retval_t get_initinstance(endpoint gshub, gs_sp_t instance_name, endpoint* instance, gs_bool_t block) {
178         char url[MAX_URL_LEN];
179
180         sprintf(url, "%s/%s", DISCOVER_INITINSTANCE_URL, instance_name);
181
182         return get_service_endpoint(gshub, url, instance, block);
183 }
184
185 // Discover stream source endpoint by name.
186 extern "C" gs_retval_t get_streamsource(endpoint gshub, gs_sp_t source_name, endpoint* source, gs_bool_t block) {
187         char url[MAX_URL_LEN];
188
189         sprintf(url, "%s/%s", DISCOVER_SOURCE_URL, source_name);
190
191         return get_service_endpoint(gshub, url, source, block);
192 }
193
194 // Discover stream sink endpoint by name.
195 extern "C" gs_retval_t get_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint* sink, gs_bool_t block) {
196         char url[MAX_URL_LEN];
197
198         sprintf(url, "%s/%s", DISCOVER_SINK_URL, sink_name);
199
200         return get_service_endpoint(gshub, url, sink, block);
201 }
202
203 // Discover if an isntance should start processing
204 gs_retval_t get_startprocessing(endpoint gshub, gs_sp_t instance_name, gs_bool_t block) {
205         char url[MAX_URL_LEN];
206
207         sprintf(url, "%s/%s", DISCOVER_STARTPROCESSING_URL, instance_name);
208
209         return get_service_endpoint(gshub, url, NULL, block);
210 }
211
212 // Announce gs instance endpoint to gshub
213 extern "C" gs_retval_t set_instance(endpoint gshub, gs_sp_t instance_name, endpoint instance) {
214         char ipstr[16];
215         char info[MAX_JSON_LEN];
216
217         inet_ntop(AF_INET, &instance.ip, ipstr, INET_ADDRSTRLEN);
218         sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", instance_name, ipstr, ntohs(instance.port));
219
220         return set_endpoint_info(gshub, ANNOUNCE_INSTANCE_URL, info);
221 }
222
223 // Announce initialized gs instance endpoint to gshub
224 extern "C" gs_retval_t set_initinstance(endpoint gshub, gs_sp_t instance_name) {
225         char info[MAX_JSON_LEN];
226
227         sprintf(info, "{\"name\": \"%s\"}", instance_name);
228
229         return set_endpoint_info(gshub, ANNOUNCE_INITINSTANCE_URL, info);
230 }
231
232 // Announce stream source endpoint to gshub
233 extern "C" gs_retval_t set_streamsource(endpoint gshub, gs_sp_t source_name, endpoint source) {
234         char ipstr[16];
235         char info[MAX_JSON_LEN];
236
237         inet_ntop(AF_INET, &source.ip, ipstr, INET_ADDRSTRLEN);
238         sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", source_name, ipstr, ntohs(source.port));
239
240         return set_endpoint_info(gshub, ANNOUNCE_SOURCE_URL, info);
241 }
242
243 // Announce stream source endpoint to gshub
244 extern "C" gs_retval_t set_streamsink(endpoint gshub, gs_sp_t sink_name, endpoint sink) {
245         char ipstr[16];
246         char info[MAX_JSON_LEN];
247
248         inet_ntop(AF_INET, &sink.ip, ipstr, INET_ADDRSTRLEN);
249         sprintf(info, "{\"name\": \"%s\", \"ip\": \"%s\", \"port\": %d}", sink_name, ipstr, ntohs(sink.port));
250
251         return set_endpoint_info(gshub, ANNOUNCE_SINK_URL, info);
252 }
253
254 // Announce to gshub that an instance can start processin
255 extern "C" gs_retval_t set_startprocessing(endpoint gshub, gs_sp_t instance_name) {
256         char info[MAX_JSON_LEN];
257
258         sprintf(info, "{\"name\": \"%s\"}", instance_name);
259
260         return set_endpoint_info(gshub, ANNOUNCE_STARTPROCESSING_URL, info);
261 }
262
263 // Announce stream subscriptino to gshub
264 extern "C" gs_retval_t set_streamsubscription(endpoint gshub, gs_sp_t instance_name, gs_sp_t sink_name) {
265         char info[MAX_JSON_LEN];
266
267         sprintf(info, "{\"name\": \"%s\", \"sink\": \"%s\"}", instance_name, sink_name);
268
269         return set_endpoint_info(gshub, ANNOUNCE_STREAM_SUBSCRIPTION, info);
270 }
271
272 // Announce new fta instantiation to gshub
273 extern "C" gs_retval_t set_ftainstance(endpoint gshub, gs_sp_t instance_name, gs_sp_t ftainstance_name, FTAID* id) {
274         char info[MAX_JSON_LEN];
275
276         sprintf(info, "{\"name\": \"%s\", \"fta_name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %lli}}",
277                 instance_name, ftainstance_name, id->ip, id->port, id->index, id->streamid);
278
279         return set_endpoint_info(gshub, ANNOUNCE_FTA_INSTANCE, info);
280 }
281
282 // Announce fta instance stats to gshub
283 extern "C" gs_retval_t set_instancestats(endpoint gshub, gs_sp_t instance_name, fta_stat* stats) {
284         char url[MAX_URL_LEN];
285         char info[MAX_JSON_LEN];
286
287         sprintf(info, "{\"name\": \"%s\", \"ftaid\": {\"ip\": %u, \"port\": %u, \"index\": %u, \"streamid\": %llu}, "
288                 "\"metrics\": {\"in_tuple_cnt\": %u, \"out_tuple_cnt\": %u, \"out_tuple_sz\": %u, \"accepted_tuple_cnt\": %u, \"cycle_cnt\": %llu, \"collision_cnt\": %u, \"eviction_cnt\": %u, \"sampling_rate\": %f}}",
289                 instance_name, stats->ftaid.ip, stats->ftaid.port, stats->ftaid.index, stats->ftaid.streamid,
290                 stats->in_tuple_cnt, stats->out_tuple_cnt, stats->out_tuple_sz, stats->accepted_tuple_cnt, stats->cycle_cnt, stats->collision_cnt, stats->eviction_cnt, stats->sampling_rate);
291
292         return set_endpoint_info(gshub, ANNOUNCE_METRICS, info);
293 }
294