1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
38 #include "schemaparser.h"
41 void fta_init(gs_sp_t device);
42 void rts_fta_process_packet(struct packet * p);
48 #include <librdkafka/rdkafka.h>
52 gs_uint32_t max_field_kafka = CSVELEMENTS;
54 #define KAFKA_TIMEOUT 1000 // timeout value for getting next batach of records (in ms)
59 static struct packet cur_packet;
60 static gs_sp_t config_fname;
61 static gs_sp_t topics_fname;
64 static size_t line_len;
65 static gs_uint32_t lineend=0;
66 static gs_uint8_t csvdel = ',';
67 static gs_uint32_t verbose=0;
68 static gs_uint32_t startupdelay=0;
70 #define MAX_KAFKA_TOPICS 256
72 static rd_kafka_t *rk;
73 static rd_kafka_conf_t *conf;
74 static rd_kafka_queue_t *rkqu = NULL;
75 static rd_kafka_topic_t *topic_list[MAX_KAFKA_TOPICS];
76 gs_uint32_t num_topics;
78 #include "lfta/csv_parser.h"
80 static int read_topic_list (rd_kafka_t * rk, rd_kafka_queue_t *kqueue, rd_kafka_topic_t **topic_list, int max_topics, const char *fname) {
85 if (!(fp = fopen(fname, "r"))) {
86 fprintf(stderr, "Unable to open kafka topic list file %s\n", fname);
90 while (line < max_topics && fgets(buf, sizeof(buf), fp)) {
91 strtok(buf, " \t\r\n"); // truncate the whitespace and end of line
92 topic_list[line] = rd_kafka_topic_new(rk, buf, NULL);
93 int r = rd_kafka_consume_start_queue(topic_list[line], 0, RD_KAFKA_OFFSET_END, kqueue);
95 fprintf(stderr, "Unable to add topic %s to queue: %s\n", buf, rd_kafka_err2str(rd_kafka_last_error()));
106 static int read_conf_file (rd_kafka_conf_t *conf, const char *fname) {
112 if (!(fp = fopen(fname, "r"))) {
113 fprintf(stderr, "Unable to open kafka configuration file %s\n", fname);
117 while (fgets(buf, sizeof(buf), fp)) {
120 rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
123 while (isspace((int)*s))
125 if (!*s || *s == '#')
128 if ((t = strchr(buf, '\n')))
130 t = strchr(buf, '=');
131 if (!t || t == s || !*(t+1)) {
132 fprintf(stderr, "Error reading kafka config file %s:%d: expected key=value\n", fname, line);
138 // set config property
139 r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));
140 if (r == RD_KAFKA_CONF_OK)
143 fprintf(stderr, "Unable set to set kafka configuration property %s:%d: %s=%s: %s\n", fname, line, s, t, errstr);
152 static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
154 if (rkmessage->err) {
155 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
156 // caught up with the data
161 csv_parse_line(rkmessage->payload, rkmessage->len);
162 rts_fta_process_packet(&cur_packet);
165 static gs_retval_t kafka_replay_init(gs_sp_t device)
171 if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
172 if (strncmp(verbosetmp,"TRUE",4)==0) {
174 fprintf(stderr,"VERBOSE ENABLED\n");
176 fprintf(stderr,"VERBOSE DISABLED\n");
180 if ((config_fname=get_iface_properties(device,(gs_sp_t)"kafkaconfig"))==0) {
181 print_error((gs_sp_t)"kafka_replay_init::No \"kafkaconfig\" defined");
185 if ((topics_fname=get_iface_properties(device,(gs_sp_t)"kafkatopics"))==0) {
186 print_error((gs_sp_t)"kafka_replay_init::No \"kafkatopics\" defined");
190 tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
193 csv_set_delim(csvdel);
196 if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
198 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
200 startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
203 // set maximum field nubmer to be extracted by csv parser
204 csv_set_maxfield(max_field_kafka);
206 cur_packet.ptype=PTYPE_CSV;
210 // load Kafka configuration from config file
211 conf = rd_kafka_conf_new();
212 read_conf_file(conf, config_fname);
214 // create new Kafka handle using configuration settings
215 if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) {
216 fprintf(stderr, "Unable to create new Kafka consumer: %s\n", errstr);
220 // load topic list fromt he file and setup a kafka queue to consume them
221 rkqu = rd_kafka_queue_new(rk);
222 num_topics = read_topic_list(rk, rkqu, topic_list, MAX_KAFKA_TOPICS, topics_fname);
224 fprintf(stderr, "Empty list of Kafka topics\n");
231 static gs_retval_t kafka_process_input()
234 static unsigned totalcnt = 0;
237 while(cnt < 50000) { // process up to 50000 tuples at a time
238 retval = rd_kafka_consume_callback_queue(rkqu, KAFKA_TIMEOUT, msg_consume, NULL);
239 if (retval == 0) return 0; // got a timeout so service message queue
243 // stop consuming from topics
244 for (i=0 ; i<num_topics ; ++i) {
245 int r = rd_kafka_consume_stop(topic_list[i], 0);
247 fprintf(stderr, "Enable to stop consuming from topic %s\n", rd_kafka_err2str(rd_kafka_last_error()));
252 rd_kafka_queue_destroy(rkqu);
255 for (i=0 ; i<num_topics ; ++i) {
256 rd_kafka_topic_destroy(topic_list[i]);
259 // destroy Kafka handle
260 rd_kafka_destroy(rk);
262 // we signal that everything is done
264 fprintf(stderr,"Done processing, waiting for things to shut down\n");
266 // now just service message queue until we get killed or loose connectivity
268 fta_start_service(0); // service all waiting messages
269 usleep(1000); // sleep a millisecond
274 totalcnt = totalcnt + cnt;
276 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
281 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
287 kafka_replay_init(device);
289 /* initalize host_lib */
291 fprintf(stderr,"Init LFTAs for %s\n",device);
294 if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
295 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
300 fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
302 // set maximum field nubmer to be extracted by csv parser
303 csv_set_maxfield(max_field_kafka);
305 cont = startupdelay + time(0);
307 if (verbose) { fprintf(stderr,"Start startup delay"); }
309 while (cont > time(NULL)) {
310 if (fta_start_service(0) < 0) {
311 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
314 usleep(1000); /* sleep for one millisecond */
317 if (verbose) { fprintf(stderr,"... Done\n"); }
319 // wait to process till we get the signal from GSHUB
320 if (get_hub(&mygshub) != 0) {
321 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
324 while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
326 if (fta_start_service(0) < 0) {
327 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
332 /* now we enter an endless loop to process data */
334 fprintf(stderr,"Start processing %s\n",device);
337 st_time = time(NULL);
339 if (kafka_process_input() < 0) {
340 fprintf(stderr,"%s::error:in processing records\n", device);
343 /* process all messages on the message queue*/
344 if (fta_start_service(0) < 0) {
345 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
354 // This is a stub entry point to ensure proper linking when Kafka support is not enabled
355 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
356 fprintf(stderr,"ERROR: runtime built without Kafka support.\n");
362 #endif // KAFKA_ENABLED