Improvements to aggregation code and fucntion library
[com/gs-lite.git] / src / lib / gscprts / rts_kafka.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6
7  http://www.apache.org/licenses/LICENSE-2.0
8
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15
16 #include <time.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20 #include <fcntl.h>
21 #include <sys/time.h>
22 #include <sys/stat.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <zlib.h>
27 #include "errno.h"
28 #include "stdio.h"
29 #include "stdlib.h"
30
31
32 #include "gsconfig.h"
33 #include "gshub.h"
34 #include "gstypes.h"
35 #include "lapp.h"
36 #include "fta.h"
37 #include "packet.h"
38 #include "schemaparser.h"
39 #include "lfta/rts.h"
40
41 void fta_init(gs_sp_t device);
42 void rts_fta_process_packet(struct packet * p);
43 void rts_fta_done();
44
45
46 #ifdef KAFKA_ENABLED
47
48 #include <librdkafka/rdkafka.h>
49
50 time_t st_time;
51
52 gs_uint32_t max_field_kafka = CSVELEMENTS;
53
54 #define KAFKA_TIMEOUT 1000              // timeout value for getting next batach of records (in ms)
55
56 gs_sp_t dev;
57
58 static int fd=-1;
59 static struct packet cur_packet;
60 static gs_sp_t config_fname;
61 static gs_sp_t topics_fname;
62 static gs_sp_t line;
63 static ssize_t len;
64 static size_t line_len;
65 static gs_uint32_t lineend=0;
66 static gs_uint8_t csvdel = ',';
67 static gs_uint32_t verbose=0;
68 static gs_uint32_t startupdelay=0;
69
70 #define MAX_KAFKA_TOPICS 256
71
72 static rd_kafka_t *rk;
73 static rd_kafka_conf_t *conf;
74 static rd_kafka_queue_t *rkqu = NULL;
75 static rd_kafka_topic_t *topic_list[MAX_KAFKA_TOPICS];
76 gs_uint32_t num_topics;
77
78 #include "lfta/csv_parser.h"
79
80 static int read_topic_list (rd_kafka_t * rk, rd_kafka_queue_t *kqueue, rd_kafka_topic_t **topic_list, int max_topics, const char *fname) {
81         FILE *fp;
82         int line = 0;   
83         char buf[512];
84
85         if (!(fp = fopen(fname, "r"))) {
86                 fprintf(stderr, "Unable to open kafka topic list file %s\n", fname);
87                 return -1;
88         }
89
90         while (line < max_topics && fgets(buf, sizeof(buf), fp)) {
91                 strtok(buf, " \t\r\n");         // truncate the whitespace and end of line
92                 topic_list[line] = rd_kafka_topic_new(rk, buf, NULL);
93                 int r = rd_kafka_consume_start_queue(topic_list[line], 0, RD_KAFKA_OFFSET_END, kqueue);
94                 if (r == -1) {
95                         fprintf(stderr, "Unable to add topic %s to queue: %s\n", buf, rd_kafka_err2str(rd_kafka_last_error()));
96                         exit(1);
97                 }
98                 line++;
99         }
100         fclose(fp);
101
102         return line;
103 }
104
105
106 static int read_conf_file (rd_kafka_conf_t *conf, const char *fname) {
107         FILE *fp;
108         int line = 0;   
109         char buf[10240];
110         char errstr[512];
111
112         if (!(fp = fopen(fname, "r"))) {
113                 fprintf(stderr, "Unable to open kafka configuration file %s\n", fname);
114                 return -1;
115         }
116
117         while (fgets(buf, sizeof(buf), fp)) {
118                 char *s = buf;
119                 char *t;
120                 rd_kafka_conf_res_t r = RD_KAFKA_CONF_UNKNOWN;
121                 line++;
122
123                 while (isspace((int)*s))
124                         s++;
125                         if (!*s || *s == '#')
126                                 continue;
127
128                         if ((t = strchr(buf, '\n')))
129                                 *t = '\0';
130                         t = strchr(buf, '=');
131                         if (!t || t == s || !*(t+1)) {
132                                 fprintf(stderr, "Error reading kafka config file %s:%d: expected key=value\n", fname, line);
133                                 fclose(fp);
134                                 return -1;
135                         }
136                         *(t++) = '\0';
137
138                         // set config property
139                         r = rd_kafka_conf_set(conf, s, t, errstr, sizeof(errstr));
140                         if (r == RD_KAFKA_CONF_OK)
141                                         continue;
142
143                         fprintf(stderr, "Unable set to set kafka configuration property %s:%d: %s=%s: %s\n", fname, line, s, t, errstr);
144                         fclose(fp);
145                         return -1;
146         }
147         fclose(fp);
148
149         return 0;
150 }
151
152 static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
153
154         if (rkmessage->err) {
155                 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
156                         // caught up with the data
157                         return;
158                 }
159                 return;
160         }
161         csv_parse_line(rkmessage->payload, rkmessage->len);
162         rts_fta_process_packet(&cur_packet);
163 }
164
165 static gs_retval_t kafka_replay_init(gs_sp_t device)
166 {
167         gs_sp_t verbosetmp;
168         gs_sp_t delaytmp;
169         gs_sp_t tempdel;
170         gs_sp_t maxfieldtmp;    
171
172         if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
173                 if (strncmp(verbosetmp,"TRUE",4)==0) {
174                         verbose=1;
175                         fprintf(stderr,"VERBOSE ENABLED\n");
176                 } else {
177                         fprintf(stderr,"VERBOSE DISABLED\n");
178                 }
179         }
180
181         if ((config_fname=get_iface_properties(device,(gs_sp_t)"kafkaconfig"))==0) {
182                 print_error((gs_sp_t)"kafka_replay_init::No \"kafkaconfig\" defined");
183                 exit(0);
184         }
185
186         if ((topics_fname=get_iface_properties(device,(gs_sp_t)"kafkatopics"))==0) {
187                 print_error((gs_sp_t)"kafka_replay_init::No \"kafkatopics\" defined");
188                 exit(0);
189         }       
190
191         tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
192         if (tempdel != 0 ) {
193                 csvdel = tempdel[0];
194                 csv_set_delim(csvdel);
195         }
196
197         if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
198                 if (verbose) {
199                                 fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
200                 }
201                 startupdelay=atoi(delaytmp);
202         }
203
204         if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
205                 max_field_kafka=atoi(maxfieldtmp);
206         }       
207
208         // set maximum field nubmer to be extracted by csv parser
209         csv_set_maxfield(max_field_kafka);
210
211         cur_packet.ptype=PTYPE_CSV;
212
213         char errstr[512];
214
215         // load Kafka configuration from config file
216         conf = rd_kafka_conf_new();
217         read_conf_file(conf, config_fname);
218
219         // create new Kafka handle using configuration settings
220         if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) {
221                 fprintf(stderr, "Unable to create new Kafka consumer: %s\n", errstr);
222                 exit(1);
223         }       
224
225         // load topic list fromt he file and setup a kafka queue to consume them
226         rkqu = rd_kafka_queue_new(rk);
227         num_topics = read_topic_list(rk, rkqu, topic_list, MAX_KAFKA_TOPICS, topics_fname);
228         if (!num_topics) {
229                 fprintf(stderr, "Empty list of Kafka topics\n");                
230         }
231
232         return 0;
233 }
234
235
236 static gs_retval_t kafka_process_input()
237 {
238         unsigned cnt = 0;
239         static unsigned totalcnt = 0;
240
241         gs_int32_t retval;
242         while(cnt < 50000) {                    // process up to 50000 tuples at a time
243                 retval = rd_kafka_consume_callback_queue(rkqu, KAFKA_TIMEOUT, msg_consume, NULL);
244                 if (retval == 0) return 0; // got a timeout so service message queue
245                 if (retval < 0) {
246                         // tear down kafka
247                         size_t i = 0;
248                         // stop consuming from topics
249                         for (i=0 ; i<num_topics ; ++i) {
250                                 int r = rd_kafka_consume_stop(topic_list[i], 0);
251                                 if (r == -1) {
252                                         fprintf(stderr, "Enable to stop consuming from topic %s\n", rd_kafka_err2str(rd_kafka_last_error()));
253                                 }
254                         }
255
256                         // destoy queue
257                         rd_kafka_queue_destroy(rkqu);
258
259                         // Destroy topics
260                         for (i=0 ; i<num_topics ; ++i) {
261                                 rd_kafka_topic_destroy(topic_list[i]);
262                         }
263
264                         // destroy Kafka handle
265                         rd_kafka_destroy(rk);
266
267                         // we signal that everything is done
268                         if (verbose)
269                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
270                         rts_fta_done();
271                         // now just service message queue until we get killed or loose connectivity
272                         while (1) {
273                                 fta_start_service(0); // service all waiting messages
274                                 usleep(1000); // sleep a millisecond
275                         }
276                 }
277                 cnt += retval;
278         }
279         totalcnt = totalcnt + cnt;
280         if (verbose) {
281                 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
282         }
283         return 0;
284 }
285
286 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
287         gs_uint32_t cont;
288         endpoint mygshub;
289
290     dev = device;
291
292         kafka_replay_init(device);
293
294         /* initalize host_lib */
295         if (verbose) {
296                 fprintf(stderr,"Init LFTAs for %s\n",device);
297         }
298
299         if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
300                 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
301                         device);
302                 exit(7);
303         }
304
305         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
306
307         // set maximum field nubmer to be extracted by csv parser
308         csv_set_maxfield(max_field_kafka);
309
310         cont = startupdelay + time(0);
311
312         if (verbose) { fprintf(stderr,"Start startup delay"); }
313
314         while (cont > time(NULL)) {
315                 if (fta_start_service(0) < 0) {
316                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
317                         exit(9);
318                 }
319                 usleep(1000); /* sleep for one millisecond */
320         }
321
322         if (verbose) { fprintf(stderr,"... Done\n"); }
323
324         // wait to process till we get the signal from GSHUB
325         if (get_hub(&mygshub) != 0) {
326                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
327                 exit(0);
328         }
329         while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
330                 usleep(100);
331                 if (fta_start_service(0) < 0) {
332                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
333                         exit(9);
334                 }
335         }
336
337         /* now we enter an endless loop to process data */
338         if (verbose) {
339                 fprintf(stderr,"Start processing %s\n",device);
340         }
341
342         st_time = time(NULL);
343         while (1) {
344                 if (kafka_process_input() < 0) {
345                         fprintf(stderr,"%s::error:in processing records\n", device);
346                         exit(8);
347                 }
348                 /* process all messages on the message queue*/
349                 if (fta_start_service(0) < 0) {
350                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
351                         exit(9);
352                 }
353         }
354
355         return 0;
356 }
357
358 #else                   
359 //              This is a stub entry point to ensure proper linking when Kafka support is not enabled
360 gs_retval_t main_kafka(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
361         fprintf(stderr,"ERROR: runtime built without Kafka support.\n");
362         exit(1);
363
364         return 0;
365 }
366
367 #endif                  // KAFKA_ENABLED
368