1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
38 #include "schemaparser.h"
41 void fta_init(gs_sp_t device);
42 void rts_fta_process_packet(struct packet * p);
48 #include <librdkafka/rdkafka.h>
52 gs_uint32_t max_field_kafka = CSVELEMENTS;
54 #define KAFKA_TIMEOUT 1000 // timeout value for getting next batach of records (in ms)
59 static struct packet cur_packet;
60 static gs_sp_t config_fname;
61 static gs_sp_t topics_fname;
64 static size_t line_len;
65 static gs_uint32_t lineend=0;
66 static gs_uint8_t csvdel = ',';
67 static gs_uint32_t verbose=0;
68 static gs_uint32_t startupdelay=0;
70 #define MAX_KAFKA_TOPICS 256
72 static rd_kafka_t *rk;
73 static rd_kafka_conf_t *conf;
74 static rd_kafka_queue_t *rkqu = NULL;
75 static rd_kafka_topic_t *topic_list[MAX_KAFKA_TOPICS];
76 gs_uint32_t num_topics;
78 #include "lfta/csv_parser.h"
80 static int read_topic_list (rd_kafka_t * rk, rd_kafka_queue_t *kqueue, rd_kafka_topic_t **topic_list, int max_topics, const char *fname) {
85 if (!(fp = fopen(fname, "r"))) {
86 fprintf(stderr, "Unable to open kafka topic list file %s\n", fname);
90 while (line < max_topics && fgets(buf, sizeof(buf), fp)) {
91 strtok(buf, " \t\r\n"); // truncate the whitespace and end of line
92 topic_list[line] = rd_kafka_topic_new(rk, buf, NULL);
93 int r = rd_kafka_consume_start_queue(topic_list[line], 0, RD_KAFKA_OFFSET_END, kqueue);
95 fprintf(stderr, "Unable to add topic %s to queue: %s\n", buf, rd_kafka_err2str(rd_kafka_last_error()));
106 static int read_conf_file (rd_kafka_conf_t *conf, const char *fname) {
112 if (!(fp = fopen(fname, "r"))) {
113 fprintf(stderr, "Unable to open kafka configuration file %s\n", fname);
117 while (fgets(buf, sizeof(buf), fp)) {
120 rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
123 while (isspace((int)*s))
125 if (!*s || *s == '#')
128 if ((t = strchr(buf, '\n')))
130 t = strchr(buf, '=');
131 if (!t || t == s || !*(t+1)) {
132 fprintf(stderr, "Error reading kafka config file %s:%d: expected key=value\n", fname, line);
138 // set config property
139 r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));
140 if (r == RD_KAFKA_CONF_OK)
143 fprintf(stderr, "Unable set to set kafka configuration property %s:%d: %s=%s: %s\n", fname, line, s, t, errstr);
152 static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
154 if (rkmessage->err) {
155 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
156 // caught up with the data
161 csv_parse_line(rkmessage->payload, rkmessage->len);
162 rts_fta_process_packet(&cur_packet);
165 static gs_retval_t kafka_replay_init(gs_sp_t device)
172 if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
173 if (strncmp(verbosetmp,"TRUE",4)==0) {
175 fprintf(stderr,"VERBOSE ENABLED\n");
177 fprintf(stderr,"VERBOSE DISABLED\n");
181 if ((config_fname=get_iface_properties(device,(gs_sp_t)"kafkaconfig"))==0) {
182 print_error((gs_sp_t)"kafka_replay_init::No \"kafkaconfig\" defined");
186 if ((topics_fname=get_iface_properties(device,(gs_sp_t)"kafkatopics"))==0) {
187 print_error((gs_sp_t)"kafka_replay_init::No \"kafkatopics\" defined");
191 tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
194 csv_set_delim(csvdel);
197 if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
199 fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
201 startupdelay=atoi(delaytmp);
204 if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
205 max_field_kafka=atoi(maxfieldtmp);
208 // set maximum field nubmer to be extracted by csv parser
209 csv_set_maxfield(max_field_kafka);
211 cur_packet.ptype=PTYPE_CSV;
215 // load Kafka configuration from config file
216 conf = rd_kafka_conf_new();
217 read_conf_file(conf, config_fname);
219 // create new Kafka handle using configuration settings
220 if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) {
221 fprintf(stderr, "Unable to create new Kafka consumer: %s\n", errstr);
225 // load topic list fromt he file and setup a kafka queue to consume them
226 rkqu = rd_kafka_queue_new(rk);
227 num_topics = read_topic_list(rk, rkqu, topic_list, MAX_KAFKA_TOPICS, topics_fname);
229 fprintf(stderr, "Empty list of Kafka topics\n");
236 static gs_retval_t kafka_process_input()
239 static unsigned totalcnt = 0;
242 while(cnt < 50000) { // process up to 50000 tuples at a time
243 retval = rd_kafka_consume_callback_queue(rkqu, KAFKA_TIMEOUT, msg_consume, NULL);
244 if (retval == 0) return 0; // got a timeout so service message queue
248 // stop consuming from topics
249 for (i=0 ; i<num_topics ; ++i) {
250 int r = rd_kafka_consume_stop(topic_list[i], 0);
252 fprintf(stderr, "Enable to stop consuming from topic %s\n", rd_kafka_err2str(rd_kafka_last_error()));
257 rd_kafka_queue_destroy(rkqu);
260 for (i=0 ; i<num_topics ; ++i) {
261 rd_kafka_topic_destroy(topic_list[i]);
264 // destroy Kafka handle
265 rd_kafka_destroy(rk);
267 // we signal that everything is done
269 fprintf(stderr,"Done processing, waiting for things to shut down\n");
271 // now just service message queue until we get killed or loose connectivity
273 fta_start_service(0); // service all waiting messages
274 usleep(1000); // sleep a millisecond
279 totalcnt = totalcnt + cnt;
281 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
286 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
292 kafka_replay_init(device);
294 /* initalize host_lib */
296 fprintf(stderr,"Init LFTAs for %s\n",device);
299 if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
300 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
305 fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
307 // set maximum field nubmer to be extracted by csv parser
308 csv_set_maxfield(max_field_kafka);
310 cont = startupdelay + time(0);
312 if (verbose) { fprintf(stderr,"Start startup delay"); }
314 while (cont > time(NULL)) {
315 if (fta_start_service(0) < 0) {
316 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
319 usleep(1000); /* sleep for one millisecond */
322 if (verbose) { fprintf(stderr,"... Done\n"); }
324 // wait to process till we get the signal from GSHUB
325 if (get_hub(&mygshub) != 0) {
326 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
329 while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
331 if (fta_start_service(0) < 0) {
332 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
337 /* now we enter an endless loop to process data */
339 fprintf(stderr,"Start processing %s\n",device);
342 st_time = time(NULL);
344 if (kafka_process_input() < 0) {
345 fprintf(stderr,"%s::error:in processing records\n", device);
348 /* process all messages on the message queue*/
349 if (fta_start_service(0) < 0) {
350 fprintf(stderr,"%s::error:in processing the msg queue\n", device);
359 // This is a stub entry point to ensure proper linking when Kafka support is not enabled
360 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
361 fprintf(stderr,"ERROR: runtime built without Kafka support.\n");
367 #endif // KAFKA_ENABLED