1 From 1962e6128a4d86a7c54977577e1e4224cadbb5f7 Mon Sep 17 00:00:00 2001
2 From: Alexander Scheel <ascheel@redhat.com>
3 Date: Wed, 2 Aug 2017 15:11:49 -0400
4 Subject: [PATCH] [client] Switch to non-blocking sockets
6 Switch the gssproxy client library to non-blocking sockets, allowing
7 for timeout and retry operations. The client will automatically retry
8 both send() and recv() operations three times on ETIMEDOUT. If the
9 combined send() and recv() hit the three time limit, ETIMEDOUT will be
10 exposed to the caller in the minor status.
12 Signed-off-by: Alexander Scheel <ascheel@redhat.com>
13 Reviewed-by: Simo Sorce <simo@redhat.com>
14 [rharwood@redhat.com: commit message cleanups, rebased]
15 Reviewed-by: Robbie Harwood <rharwood@redhat.com>
16 (cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
18 proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++---
19 1 file changed, 295 insertions(+), 22 deletions(-)
21 diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
22 index 2133618..dba23a6 100644
23 --- a/proxy/src/client/gpm_common.c
24 +++ b/proxy/src/client/gpm_common.c
29 +#include <sys/epoll.h>
31 +#include <sys/timerfd.h>
33 #define FRAGMENT_BIT (1 << 31)
35 +#define RESPONSE_TIMEOUT 15
36 +#define MAX_TIMEOUT_RETRY 3
41 @@ -20,6 +26,9 @@ struct gpm_ctx {
50 /* a single global struct is not particularly efficient,
51 @@ -39,6 +48,8 @@ static void gpm_init_once(void)
52 pthread_mutex_init(&gpm_global_ctx.lock, &attr);
54 gpm_global_ctx.fd = -1;
55 + gpm_global_ctx.epollfd = -1;
56 + gpm_global_ctx.timerfd = -1;
58 seedp = time(NULL) + getpid() + pthread_self();
59 gpm_global_ctx.next_xid = rand_r(&seedp);
60 @@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
61 struct sockaddr_un addr = {0};
67 ret = get_pipe_name(name);
68 @@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
72 + ret = fcntl(fd, F_GETFD, &flags);
78 + ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
84 ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
87 @@ -163,6 +187,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
88 return pthread_mutex_unlock(&gpmctx->lock);
91 +static void gpm_timer_close(struct gpm_ctx *gpmctx) {
92 + if (gpmctx->timerfd < 0) {
96 + close(gpmctx->timerfd);
97 + gpmctx->timerfd = -1;
100 +static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
102 + struct itimerspec its;
104 + if (gpmctx->timerfd >= 0) {
105 + gpm_timer_close(gpmctx);
108 + gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
109 + if (gpmctx->timerfd < 0) {
113 + its.it_interval.tv_sec = timeout_seconds;
114 + its.it_interval.tv_nsec = 0;
115 + its.it_value.tv_sec = timeout_seconds;
116 + its.it_value.tv_nsec = 0;
118 + ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
121 + gpm_timer_close(gpmctx);
128 +static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
129 + if (gpmctx->epollfd < 0) {
133 + close(gpmctx->epollfd);
134 + gpmctx->epollfd = -1;
137 +static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
138 + struct epoll_event ev;
141 + if (gpmctx->epollfd >= 0) {
142 + gpm_epoll_close(gpmctx);
145 + gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
146 + if (gpmctx->epollfd == -1) {
151 + ev.events = EPOLLIN;
152 + ev.data.fd = gpmctx->timerfd;
153 + ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev);
156 + gpm_epoll_close(gpmctx);
163 +static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
166 + struct epoll_event ev;
167 + struct epoll_event events[2];
168 + uint64_t timer_read;
170 + if (gpmctx->epollfd < 0) {
171 + ret = gpm_epoll_setup(gpmctx);
176 + ev.events = event_flags;
177 + ev.data.fd = gpmctx->fd;
178 + epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev);
179 + if (epoll_ret == -1) {
181 + gpm_epoll_close(gpmctx);
186 + epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
187 + } while (epoll_ret < 0 && errno == EINTR);
189 + if (epoll_ret < 0) {
190 + /* Error while waiting that isn't EINTR */
192 + gpm_epoll_close(gpmctx);
193 + } else if (epoll_ret == 0) {
194 + /* Shouldn't happen as timeout == -1; treat it like a timeout
197 + gpm_epoll_close(gpmctx);
198 + } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
199 + /* Got an event which is only our timer */
200 + ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
201 + if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
202 + /* In the case when reading from the timer failed, don't hide the
203 + * timer error behind ETIMEDOUT such that it isn't retried */
206 + /* If ret == 0, then we definitely timed out. Else, if ret == -1
207 + * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
208 + * edge case where epoll thinks the timer can be read, but it
209 + * is blocking more; treat it like a TIMEOUT and retry, as
210 + * nothing around us would handle EAGAIN from timer and retry
214 + gpm_epoll_close(gpmctx);
216 + /* If ret == 2, then we ignore the timerfd; that way if the next
217 + * operation cannot be performed immediately, we timeout and retry.
218 + * If ret == 1 and data.fd == gpmctx->fd, return 0. */
222 + epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
223 + if (epoll_ret == -1) {
224 + /* If we previously had an error, expose that error instead of
225 + * clobbering it with errno; else if no error, then assume it is
226 + * better to notify of the error deleting the event than it is
230 + gpm_epoll_close(gpmctx);
236 +static int gpm_retry_socket(struct gpm_ctx *gpmctx)
238 + gpm_epoll_close(gpmctx);
239 + gpm_close_socket(gpmctx);
240 + return gpm_open_socket(gpmctx);
243 /* must be called after the lock has been grabbed */
244 static int gpm_send_buffer(struct gpm_ctx *gpmctx,
245 char *buffer, uint32_t length)
246 @@ -183,8 +359,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
250 + ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
256 - wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
257 + wn = write(gpmctx->fd, &size, sizeof(uint32_t));
261 @@ -192,8 +373,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
263 /* reopen and retry once */
264 if (retry == false) {
265 - gpm_close_socket(gpmctx);
266 - ret = gpm_open_socket(gpmctx);
267 + ret = gpm_retry_socket(gpmctx);
271 @@ -208,9 +388,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
274 while (length > pos) {
275 - wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
276 + ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
281 + wn = write(gpmctx->fd, buffer + pos, length - pos);
283 - if (errno == EINTR) {
284 + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
288 @@ -231,7 +416,7 @@ done:
290 /* must be called after the lock has been grabbed */
291 static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
292 - char *buffer, uint32_t *length)
293 + char **buffer, uint32_t *length)
297 @@ -239,6 +424,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
301 + ret = gpm_epoll_wait(gpmctx, EPOLLIN);
307 rn = read(gpmctx->fd, &size, sizeof(uint32_t));
309 @@ -258,11 +448,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
313 + *buffer = malloc(*length);
314 + if (*buffer == NULL) {
320 while (*length > pos) {
321 - rn = read(gpmctx->fd, buffer + pos, *length - pos);
322 + ret = gpm_epoll_wait(gpmctx, EPOLLIN);
327 + rn = read(gpmctx->fd, *buffer + pos, *length - pos);
329 - if (errno == EINTR) {
330 + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
334 @@ -281,6 +482,7 @@ done:
336 /* on errors we can only close the fd and return */
337 gpm_close_socket(gpmctx);
338 + gpm_epoll_close(gpmctx);
342 @@ -309,6 +511,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
343 return &gpm_global_ctx;
346 +static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
347 + uint32_t send_length, char** recv_buffer,
348 + uint32_t *recv_length)
354 + ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
358 + for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
359 + /* send to proxy */
360 + ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
363 + /* No error, continue to recv */
364 + } else if (ret == ETIMEDOUT) {
365 + /* Close and reopen socket before trying again */
366 + ret = gpm_retry_socket(gpmctx);
371 + /* RETRY entire send */
378 + /* receive answer */
379 + ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
383 + } else if (ret == ETIMEDOUT) {
384 + /* Close and reopen socket before trying again */
385 + ret = gpm_retry_socket(gpmctx);
387 + /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
389 + recv_buffer = NULL;
403 OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
406 @@ -399,15 +658,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
410 - char buffer[MAX_RPC_SIZE];
412 + char *send_buffer = NULL;
413 + char *recv_buffer = NULL;
414 + uint32_t send_length;
415 + uint32_t recv_length;
418 bool sockgrab = false;
421 - xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
422 - xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
423 + send_buffer = malloc(MAX_RPC_SIZE);
424 + if (send_buffer == NULL)
427 + xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
429 memset(&msg, 0, sizeof(gp_rpc_msg));
430 msg.header.type = GP_RPC_CALL;
431 @@ -450,22 +714,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
435 - /* send to proxy */
436 - ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
440 + /* set send_length */
441 + send_length = xdr_getpos(&xdr_call_ctx);
443 - /* receive answer */
444 - ret = gpm_recv_buffer(gpmctx, buffer, &length);
446 + /* Send request, receive response with timeout */
447 + ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
453 /* release the lock */
454 gpm_release_sock(gpmctx);
457 + /* Create the reply context */
458 + xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
461 memset(&msg, 0, sizeof(gp_rpc_msg));
462 xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg);
463 @@ -489,12 +753,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
467 + gpm_timer_close(gpmctx);
468 + gpm_epoll_close(gpmctx);
471 gpm_release_sock(gpmctx);
473 xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg);
474 xdr_destroy(&xdr_call_ctx);
475 - xdr_destroy(&xdr_reply_ctx);
477 + if (recv_buffer != NULL)
478 + xdr_destroy(&xdr_reply_ctx);