Signed-off-by: Angus Salkeld asalkeld@redhat.com --- lib/ipc_int.h | 2 + lib/ipcs.c | 178 ++++++++++++++++++++++++++++++++++++--------------------- 2 files changed, 115 insertions(+), 65 deletions(-)
diff --git a/lib/ipc_int.h b/lib/ipc_int.h index e609302..ed36ca8 100644 --- a/lib/ipc_int.h +++ b/lib/ipc_int.h @@ -178,6 +178,8 @@ struct qb_ipcs_connection { char *receive_buf; void *context; int32_t fc_enabled; + int32_t poll_events; + int32_t outstanding_notifiers; struct qb_ipcs_connection_stats stats; };
diff --git a/lib/ipcs.c b/lib/ipcs.c index 37d950b..5fd014c 100644 --- a/lib/ipcs.c +++ b/lib/ipcs.c @@ -105,22 +105,49 @@ int32_t qb_ipcs_run(struct qb_ipcs_service* s) return res; }
-void qb_ipcs_request_rate_limit(struct qb_ipcs_service* s, enum qb_ipcs_rate_limit rl) +static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c) +{ + if (c->service->type == QB_IPC_POSIX_MQ + && !c->service->needs_sock_for_poll) { + return c->service->poll_fns.dispatch_mod(c->service-> + poll_priority, + (int32_t) c->request.u. + pmq.q, c->poll_events, + c, + qb_ipcs_dispatch_service_request); + } else if (c->service->type == QB_IPC_SOCKET) { + return c->service->poll_fns.dispatch_mod(c->service-> + poll_priority, + c->event.u.us.sock, + c->poll_events, c, + qb_ipcs_dispatch_connection_request); + } else { + return c->service->poll_fns.dispatch_mod(c->service-> + poll_priority, + c->setup.u.us.sock, + c->poll_events, c, + qb_ipcs_dispatch_connection_request); + } + return -EINVAL; +} + +void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s, + enum qb_ipcs_rate_limit rl) { struct qb_ipcs_connection *c; - enum qb_loop_priority p; + enum qb_loop_priority old_p = s->poll_priority;
switch (rl) { case QB_IPCS_RATE_FAST: - p = QB_LOOP_HIGH; + s->poll_priority = QB_LOOP_HIGH; break; case QB_IPCS_RATE_SLOW: case QB_IPCS_RATE_OFF: - p = QB_LOOP_LOW; + s->poll_priority = QB_LOOP_LOW; break; default: case QB_IPCS_RATE_NORMAL: - p = QB_LOOP_MED; + s->poll_priority = QB_LOOP_MED; break; }
@@ -128,29 +155,14 @@ void qb_ipcs_request_rate_limit(struct qb_ipcs_service* s, enum qb_ipcs_rate_lim qb_ipcs_connection_ref_inc(c);
qb_ipcs_flowcontrol_set(c, (rl == QB_IPCS_RATE_OFF)); - if (s->poll_priority == p) { + if (old_p == s->poll_priority) { qb_ipcs_connection_ref_dec(c); continue; }
- if (s->type == QB_IPC_POSIX_MQ && !s->needs_sock_for_poll) { - (void)s->poll_fns.dispatch_mod(p, (int32_t)c->request.u.pmq.q, - POLLIN | POLLPRI | POLLNVAL, - c, qb_ipcs_dispatch_service_request); - } else if (s->type == QB_IPC_SOCKET) { - (void)s->poll_fns.dispatch_mod(p, c->event.u.us.sock, - POLLIN | POLLPRI | POLLNVAL, - c, - qb_ipcs_dispatch_connection_request); - } else { - (void)s->poll_fns.dispatch_mod(p, c->setup.u.us.sock, - POLLIN | POLLPRI | POLLNVAL, - c, - qb_ipcs_dispatch_connection_request); - } + (void)_modify_dispatch_descriptor_(c); qb_ipcs_connection_ref_dec(c); } - s->poll_priority = p; }
void qb_ipcs_ref(struct qb_ipcs_service *s) @@ -223,76 +235,102 @@ ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const struct iovec return res; }
-ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void *data, +static int32_t send_event_notification(int32_t fd, int32_t revents, void *data) +{ + ssize_t res = 0; + struct qb_ipcs_connection *c = data; + + if (c->outstanding_notifiers > 0) { + res = qb_ipc_us_send(&c->setup, data, c->outstanding_notifiers); + } + if (res > 0) { + c->outstanding_notifiers -= res; + } + if (c->outstanding_notifiers > 0) { + return 0; + } else { + c->outstanding_notifiers = 0; + c->poll_events = POLLIN; + (void)_modify_dispatch_descriptor_(c); + } + return 0; +} + +ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size) { ssize_t res; ssize_t res2 = 0; - int32_t try_count = 0;
qb_ipcs_connection_ref_inc(c);
- do { - try_count++; - res = c->service->funcs.send(&c->event, data, size); - if (res == size) { - c->stats.events++; - } else if (res == -EAGAIN) { - c->stats.send_retries++; - } - } while (res == -EAGAIN && try_count < 20); - if (res > 0) { - if (c->service->needs_sock_for_poll) { - do { - res2 = qb_ipc_us_send(&c->setup, &res, 1); - } while (res2 == -EAGAIN); + res = c->service->funcs.send(&c->event, data, size); + if (res != size) { + goto deref_and_return; + } + c->stats.events++; + if (c->service->needs_sock_for_poll) { + if (c->outstanding_notifiers > 0) { + c->outstanding_notifiers++; + } else { + res2 = qb_ipc_us_send(&c->setup, data, 1); + if (res2 == 1) { + goto deref_and_return; + } + /* + * notify the client later, when we can. + */ + c->outstanding_notifiers++; + c->poll_events = POLLOUT | POLLIN; + (void)_modify_dispatch_descriptor_(c); } - } else if (res != -EAGAIN) { - qb_util_log(LOG_ERR, - "failed to send event : %s", - strerror(-res)); } + +deref_and_return: + qb_ipcs_connection_ref_dec(c);
return res; }
- -ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct iovec * iov, size_t iov_len) +ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c, + const struct iovec * iov, size_t iov_len) { ssize_t res; ssize_t res2; - int32_t try_count = 0;
qb_ipcs_connection_ref_inc(c);
- do { - try_count++; - res = c->service->funcs.sendv(&c->event, iov, iov_len); - if (res > 0) { - c->stats.events++; - } else if (res == -EAGAIN) { - c->stats.send_retries++; - } - } while (res == -EAGAIN && try_count < 20); - if (res > 0) { - if (c->service->needs_sock_for_poll) { - do { - res2 = qb_ipc_us_send(&c->setup, &res, 1); - } while (res2 == -EAGAIN); + res = c->service->funcs.sendv(&c->event, iov, iov_len); + if (res < 0) { + goto deref_and_return; + } + c->stats.events++; + if (c->service->needs_sock_for_poll) { + if (c->outstanding_notifiers > 0) { + c->outstanding_notifiers++; + } else { + res2 = qb_ipc_us_send(&c->setup, res, 1); + if (res2 == 1) { + goto deref_and_return; + } + /* + * notify the client later, when we can. + */ + c->outstanding_notifiers++; + c->poll_events = POLLOUT | POLLIN; + (void)_modify_dispatch_descriptor_(c); } - } else if (res != -EAGAIN) { - qb_util_log(LOG_ERR, - "failed to send event : %s", - strerror(-res)); }
+deref_and_return: + qb_ipcs_connection_ref_dec(c);
return res; }
-qb_ipcs_connection_t * qb_ipcs_connection_first_get(struct qb_ipcs_service* s) +qb_ipcs_connection_t *qb_ipcs_connection_first_get(struct qb_ipcs_service * s) { struct qb_ipcs_connection *c; struct qb_list_head *iter; @@ -523,6 +561,16 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, qb_ipcs_disconnect(c); return -ESHUTDOWN; } + + if (revents & POLLOUT) { + res = send_event_notification(fd, revents, data); + if ((revents & POLLIN) == 0) { + return 0; + } + } + if (c->fc_enabled) { + return 0; + } avail = _request_q_len_get(c); do { res = _process_request_(c, IPC_REQUEST_TIMEOUT); @@ -532,7 +580,7 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, if (res > 0) { avail--; } - } while (avail > 0 && res > 0); + } while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) { (void)qb_ipc_us_recv(&c->setup, bytes, recvd, 0);