31049e55726ae011d7df1f583c179d8a8b122c17
[com/gs-lite.git] / src / lib / gscprts / rts_kafka.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6
7  http://www.apache.org/licenses/LICENSE-2.0
8
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16 #include <time.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20 #include <fcntl.h>
21 #include <sys/time.h>
22 #include <sys/stat.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <zlib.h>
27 #include "errno.h"
28 #include "stdio.h"
29 #include "stdlib.h"
30
31
32 #include "gsconfig.h"
33 #include "gshub.h"
34 #include "gstypes.h"
35 #include "lapp.h"
36 #include "fta.h"
37 #include "packet.h"
38 #include "schemaparser.h"
39 #include "lfta/rts.h"
40
41 void fta_init(gs_sp_t device);
42 void rts_fta_process_packet(struct packet * p);
43 void rts_fta_done();
44
45
46 #ifdef KAFKA_ENABLED
47
48 #include <librdkafka/rdkafka.h>
49
50 time_t st_time;
51
52 gs_uint32_t max_field_kafka = CSVELEMENTS;
53
54 #define KAFKA_TIMEOUT 1000              // timeout value for getting next batach of records (in ms)
55
56 gs_sp_t dev;
57
58 static int fd=-1;
59 static struct packet cur_packet;
60 static gs_sp_t config_fname;
61 static gs_sp_t topics_fname;
62 static gs_sp_t line;
63 static ssize_t len;
64 static size_t line_len;
65 static gs_uint32_t lineend=0;
66 static gs_uint8_t csvdel = ',';
67 static gs_uint32_t verbose=0;
68 static gs_uint32_t startupdelay=0;
69
70 #define MAX_KAFKA_TOPICS 256
71
72 static rd_kafka_t *rk;
73 static rd_kafka_conf_t *conf;
74 static rd_kafka_queue_t *rkqu = NULL;
75 static rd_kafka_topic_t *topic_list[MAX_KAFKA_TOPICS];
76 gs_uint32_t num_topics;
77
78 #include "lfta/csv_parser.h"
79
80 static int read_topic_list (rd_kafka_t * rk, rd_kafka_queue_t *kqueue, rd_kafka_topic_t **topic_list, int max_topics, const char *fname) {
81         FILE *fp;
82         int line = 0;   
83         char buf[512];
84
85         if (!(fp = fopen(fname, "r"))) {
86                 fprintf(stderr, "Unable to open kafka topic list file %s\n", fname);
87                 return -1;
88         }
89
90         while (line < max_topics && fgets(buf, sizeof(buf), fp)) {
91                 strtok(buf, " \t\r\n");         // truncate the whitespace and end of line
92                 topic_list[line] = rd_kafka_topic_new(rk, buf, NULL);
93                 int r = rd_kafka_consume_start_queue(topic_list[line], 0, RD_KAFKA_OFFSET_END, kqueue);
94                 if (r == -1) {
95                         fprintf(stderr, "Unable to add topic %s to queue: %s\n", buf, rd_kafka_err2str(rd_kafka_last_error()));
96                         exit(1);
97                 }
98                 line++;
99         }
100         fclose(fp);
101
102         return line;
103 }
104
105
106 static int read_conf_file (rd_kafka_conf_t *conf, const char *fname) {
107         FILE *fp;
108         int line = 0;   
109         char buf[10240];
110         char errstr[512];
111
112         if (!(fp = fopen(fname, "r"))) {
113                 fprintf(stderr, "Unable to open kafka configuration file %s\n", fname);
114                 return -1;
115         }
116
117         while (fgets(buf, sizeof(buf), fp)) {
118                 char *s = buf;
119                 char *t;
120                 rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
121                 line++;
122
123                 while (isspace((int)*s))
124                         s++;
125                         if (!*s || *s == '#')
126                                 continue;
127
128                         if ((t = strchr(buf, '\n')))
129                                 *t = '\0';
130                         t = strchr(buf, '=');
131                         if (!t || t == s || !*(t+1)) {
132                                 fprintf(stderr, "Error reading kafka config file %s:%d: expected key=value\n", fname, line);
133                                 fclose(fp);
134                                 return -1;
135                         }
136                         *(t++) = '\0';
137
138                         // set config property
139                         r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));
140                         if (r == RD_KAFKA_CONF_OK)
141                                         continue;
142
143                         fprintf(stderr, "Unable set to set kafka configuration property %s:%d: %s=%s: %s\n", fname, line, s, t, errstr);
144                         fclose(fp);
145                         return -1;
146         }
147         fclose(fp);
148
149         return 0;
150 }
151
152 static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
153
154         if (rkmessage->err) {
155                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
156                         // caught up with the data
157                         return;
158                 }
159                 return;
160         }
161         csv_parse_line(rkmessage->payload, rkmessage->len);
162         rts_fta_process_packet(&cur_packet);
163 }
164
165 static gs_retval_t kafka_replay_init(gs_sp_t device)
166 {
167         gs_sp_t verbosetmp;
168         gs_sp_t delaytmp;
169         gs_sp_t tempdel;
170
171         if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
172                 if (strncmp(verbosetmp,"TRUE",4)==0) {
173                         verbose=1;
174                         fprintf(stderr,"VERBOSE ENABLED\n");
175                 } else {
176                         fprintf(stderr,"VERBOSE DISABLED\n");
177                 }
178         }
179
180         if ((config_fname=get_iface_properties(device,(gs_sp_t)"kafkaconfig"))==0) {
181                 print_error((gs_sp_t)"kafka_replay_init::No \"kafkaconfig\" defined");
182                 exit(0);
183         }
184
185         if ((topics_fname=get_iface_properties(device,(gs_sp_t)"kafkatopics"))==0) {
186                 print_error((gs_sp_t)"kafka_replay_init::No \"kafkatopics\" defined");
187                 exit(0);
188         }       
189
190         tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
191         if (tempdel != 0 ) {
192                 csvdel = tempdel[0];
193                 csv_set_delim(csvdel);
194         }
195
196         if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
197                 if (verbose) {
198                                 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
199                 }
200                 startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
201         }
202
203         // set maximum field nubmer to be extracted by csv parser
204         csv_set_maxfield(max_field_kafka);
205
206         cur_packet.ptype=PTYPE_CSV;
207
208         char errstr[512];
209
210         // load Kafka configuration from config file
211         conf = rd_kafka_conf_new();
212         read_conf_file(conf, config_fname);
213
214         // create new Kafka handle using configuration settings
215         if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) {
216                 fprintf(stderr, "Unable to create new Kafka consumer: %s\n", errstr);
217                 exit(1);
218         }       
219
220         // load topic list fromt he file and setup a kafka queue to consume them
221         rkqu = rd_kafka_queue_new(rk);
222         num_topics = read_topic_list(rk, rkqu, topic_list, MAX_KAFKA_TOPICS, topics_fname);
223         if (!num_topics) {
224                 fprintf(stderr, "Empty list of Kafka topics\n");                
225         }
226
227         return 0;
228 }
229
230
231 static gs_retval_t kafka_process_input()
232 {
233         unsigned cnt = 0;
234         static unsigned totalcnt = 0;
235
236         gs_int32_t retval;
237         while(cnt < 50000) {                    // process up to 50000 tuples at a time
238                 retval = rd_kafka_consume_callback_queue(rkqu, KAFKA_TIMEOUT, msg_consume, NULL);
239                 if (retval == 0) return 0; // got a timeout so service message queue
240                 if (retval < 0) {
241                         // tear down kafka
242                         size_t i = 0;
243                         // stop consuming from topics
244                         for (i=0 ; i<num_topics ; ++i) {
245                                 int r = rd_kafka_consume_stop(topic_list[i], 0);
246                                 if (r == -1) {
247                                         fprintf(stderr, "Enable to stop consuming from topic %s\n", rd_kafka_err2str(rd_kafka_last_error()));
248                                 }
249                         }
250
251                         // destoy queue
252                         rd_kafka_queue_destroy(rkqu);
253
254                         // Destroy topics
255                         for (i=0 ; i<num_topics ; ++i) {
256                                 rd_kafka_topic_destroy(topic_list[i]);
257                         }
258
259                         // destroy Kafka handle
260                         rd_kafka_destroy(rk);
261
262                         // we signal that everything is done
263                         if (verbose)
264                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
265                         rts_fta_done();
266                         // now just service message queue until we get killed or loose connectivity
267                         while (1) {
268                                 fta_start_service(0); // service all waiting messages
269                                 usleep(1000); // sleep a millisecond
270                         }
271                 }
272                 cnt += retval;
273         }
274         totalcnt = totalcnt + cnt;
275         if (verbose) {
276                 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
277         }
278         return 0;
279 }
280
281 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
282         gs_uint32_t cont;
283         endpoint mygshub;
284
285     dev = device;
286
287         kafka_replay_init(device);
288
289         /* initalize host_lib */
290         if (verbose) {
291                 fprintf(stderr,"Init LFTAs for %s\n",device);
292         }
293
294         if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
295                 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
296                         device);
297                 exit(7);
298         }
299
300         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
301
302         // set maximum field nubmer to be extracted by csv parser
303         csv_set_maxfield(max_field_kafka);
304
305         cont = startupdelay + time(0);
306
307         if (verbose) { fprintf(stderr,"Start startup delay"); }
308
309         while (cont > time(NULL)) {
310                 if (fta_start_service(0) < 0) {
311                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
312                         exit(9);
313                 }
314                 usleep(1000); /* sleep for one millisecond */
315         }
316
317         if (verbose) { fprintf(stderr,"... Done\n"); }
318
319         // wait to process till we get the signal from GSHUB
320         if (get_hub(&mygshub) != 0) {
321                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
322                 exit(0);
323         }
324         while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
325                 usleep(100);
326                 if (fta_start_service(0) < 0) {
327                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
328                         exit(9);
329                 }
330         }
331
332         /* now we enter an endless loop to process data */
333         if (verbose) {
334                 fprintf(stderr,"Start processing %s\n",device);
335         }
336
337         st_time = time(NULL);
338         while (1) {
339                 if (kafka_process_input() < 0) {
340                         fprintf(stderr,"%s::error:in processing records\n", device);
341                         exit(8);
342                 }
343                 /* process all messages on the message queue*/
344                 if (fta_start_service(0) < 0) {
345                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
346                         exit(9);
347                 }
348         }
349
350         return 0;
351 }
352
353 #else                   
354 //              This is a stub entry point to ensure proper linking when Kafka support is not enabled
355 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
356         fprintf(stderr,"ERROR: runtime built without Kafka support.\n");
357         exit(1);
358
359         return 0;
360 }
361
362 #endif                  // KAFKA_ENABLED
363