Also allow the ringbuffer to pass ETIMEDOUT back to the client applications.
Signed-off-by: Angus Salkeld asalkeld@redhat.com --- include/qb/qbipcc.h | 9 ++++----- lib/ipc_shm.c | 15 ++------------- lib/ipc_us.c | 4 ++-- lib/ipcc.c | 47 ++++++++++++++++------------------------------- lib/ipcs.c | 18 ++++++++++++------ tests/bmc.c | 12 ++---------- tests/bmcpt.c | 21 ++++++++------------- tests/check_ipc.c | 21 ++++++++++----------- 8 files changed, 56 insertions(+), 91 deletions(-)
diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h index 1fa892c..b6f94a8 100644 --- a/include/qb/qbipcc.h +++ b/include/qb/qbipcc.h @@ -113,7 +113,7 @@ ssize_t qb_ipcc_sendv(qb_ipcc_connection_t* c, const struct iovec* iov, * @return (size recv'ed, -errno == error) */ ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr, - size_t msg_len); + size_t msg_len, int32_t ms_timeout);
/** * This is a convenience function that simply sends and then recvs. @@ -127,10 +127,9 @@ ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr, * @see qb_ipcc_sendv() qb_ipcc_recv() */ ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c, - const struct iovec *iov, - unsigned int iov_len, - void *msg_ptr, - size_t msg_len); + const struct iovec *iov, uint32_t iov_len, + void *msg_ptr, size_t msg_len, + int32_t ms_timeout);
/** * Receive an event. diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c index 1b8fd6e..55670dc 100644 --- a/lib/ipc_shm.c +++ b/lib/ipc_shm.c @@ -86,34 +86,23 @@ static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way, size_t msg_len, int32_t ms_timeout) { - ssize_t res; if (one_way->u.shm.rb == NULL) { return -ENOTCONN; } - res = qb_rb_chunk_read(one_way->u.shm.rb, + return qb_rb_chunk_read(one_way->u.shm.rb, (void *)msg_ptr, msg_len, ms_timeout); - if (res == -ETIMEDOUT) { - return -EAGAIN; - } - return res; }
static ssize_t qb_ipc_shm_peek(struct qb_ipc_one_way *one_way, void **data_out, int32_t ms_timeout) { - ssize_t res; - if (one_way->u.shm.rb == NULL) { return -ENOTCONN; } - res = qb_rb_chunk_peek(one_way->u.shm.rb, + return qb_rb_chunk_peek(one_way->u.shm.rb, data_out, ms_timeout); - if (res == -ETIMEDOUT) { - return -EAGAIN; - } - return res; }
static void qb_ipc_shm_reclaim(struct qb_ipc_one_way *one_way) diff --git a/lib/ipc_us.c b/lib/ipc_us.c index 14da2dd..3fe0ddd 100644 --- a/lib/ipc_us.c +++ b/lib/ipc_us.c @@ -227,7 +227,7 @@ ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way,
retry_recv: result = recv(one_way->u.us.sock, msg, len, MSG_NOSIGNAL | MSG_WAITALL); - if (result == -1 && errno == EAGAIN) { + if (timeout == -1 && result == -1 && errno == EAGAIN) { goto retry_recv; } if (result == -1) { @@ -318,7 +318,7 @@ int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, return res; }
- res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), 0); + res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), -1); if (res < 0) { return res; } diff --git a/lib/ipcc.c b/lib/ipcc.c index 5fb8039..8248cdd 100644 --- a/lib/ipcc.c +++ b/lib/ipcc.c @@ -131,55 +131,40 @@ ssize_t qb_ipcc_sendv(struct qb_ipcc_connection* c, const struct iovec* iov, }
ssize_t qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr, - size_t msg_len) + size_t msg_len, int32_t ms_timeout) { int32_t res = 0; - int32_t retries = 0; - - recv_retry: - retries++; - res = c->funcs.recv(&c->response, msg_ptr, msg_len, 100); - if (res == -EAGAIN && c->needs_sock_for_poll) { - res = qb_ipc_us_recv_ready(&c->setup, 0); - if (res == -EAGAIN && retries < 50) { - goto recv_retry; - } else if (res < 0) { - return res; + int32_t res2 = 0; + + res = c->funcs.recv(&c->response, msg_ptr, msg_len, ms_timeout); + if ((res == -EAGAIN || res == -ETIMEDOUT) && c->needs_sock_for_poll) { + res2 = qb_ipc_us_recv_ready(&c->setup, 0); + if (res2 < 0) { + return res2; } else { - return -EAGAIN; + return res; } } return res; }
-ssize_t qb_ipcc_sendv_recv ( - qb_ipcc_connection_t *c, - const struct iovec *iov, - unsigned int iov_len, - void *res_msg, - size_t res_len) +ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c, + const struct iovec *iov, uint32_t iov_len, + void *res_msg, size_t res_len, + int32_t ms_timeout) { - ssize_t res; + ssize_t res = 0;
if (c->funcs.fc_get && c->funcs.fc_get(&c->request)) { return -EAGAIN; }
-repeat_send: res = qb_ipcc_sendv(c, iov, iov_len); if (res < 0) { - if (res == -EAGAIN) { - goto repeat_send; - } return res; }
-repeat_recv: - res = qb_ipcc_recv(c, res_msg, res_len); - if (res == -EAGAIN) { - goto repeat_recv; - } - return res; + return qb_ipcc_recv(c, res_msg, res_len, ms_timeout); }
int32_t qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd) @@ -217,7 +202,7 @@ ssize_t qb_ipcc_event_recv(struct qb_ipcc_connection * c, void *msg_pt, return size; } if (c->needs_sock_for_poll) { - res = qb_ipc_us_recv(&c->setup, &one_byte, 1, 0); + res = qb_ipc_us_recv(&c->setup, &one_byte, 1, -1); if (res < 0) { return res; } diff --git a/lib/ipcs.c b/lib/ipcs.c index 51a3164..2c615ba 100644 --- a/lib/ipcs.c +++ b/lib/ipcs.c @@ -211,7 +211,7 @@ ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data, res = c->service->funcs.send(&c->response, data, size); if (res == size) { c->stats.responses++; - } else if (res == -EAGAIN) { + } else if (res == -EAGAIN || res == -ETIMEDOUT) { c->stats.send_retries++; } qb_ipcs_connection_unref(c); @@ -227,7 +227,7 @@ ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const struct iovec res = c->service->funcs.sendv(&c->response, iov, iov_len); if (res > 0) { c->stats.responses++; - } else if (res == -EAGAIN) { + } else if (res == -EAGAIN || res == -ETIMEDOUT) { c->stats.send_retries++; } qb_ipcs_connection_unref(c); @@ -481,7 +481,7 @@ static int32_t _process_request_(struct qb_ipcs_connection *c, ms_timeout); } if (size < 0) { - if (size != -EAGAIN) { + if (size != -EAGAIN && size != -ETIMEDOUT) { qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror(-res)); } else { c->stats.recv_retries++; @@ -573,6 +573,12 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, return 0; } avail = _request_q_len_get(c); + + if (c->service->needs_sock_for_poll && avail == 0) { + (void)qb_ipc_us_recv(&c->setup, bytes, 1, 0); + return 0; + } + res = avail; /* in case error */ do { res = _process_request_(c, IPC_REQUEST_TIMEOUT); @@ -585,15 +591,15 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, } while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) { - (void)qb_ipc_us_recv(&c->setup, bytes, recvd, 0); + (void)qb_ipc_us_recv(&c->setup, bytes, recvd, -1); }
res = QB_MIN(0, res); - if (res == -EAGAIN || res == -ENOBUFS) { + if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) { res = 0; } if (res != 0) { - qb_util_log(LOG_INFO, "%s returning %d : %s", + qb_util_log(LOG_DEBUG, "%s returning %d : %s", __func__, res, strerror(-res)); qb_ipcs_connection_unref(c); } diff --git a/tests/bmc.c b/tests/bmc.c index 5eafd56..e49d0e6 100644 --- a/tests/bmc.c +++ b/tests/bmc.c @@ -99,13 +99,9 @@ repeat_send: }
if (blocking) { - repeat_recv: res = qb_ipcc_recv(conn, &res_header, - sizeof(struct qb_ipc_response_header)); - if (res == -EAGAIN) { - goto repeat_recv; - } + sizeof(struct qb_ipc_response_header), -1); if (res == -EINTR) { return -1; } @@ -117,13 +113,9 @@ repeat_send: assert(res_header.size == sizeof(struct qb_ipc_response_header)); } if (events) { - repeat_event_recv: res = qb_ipcc_event_recv(conn, &res_header, - sizeof(struct qb_ipc_response_header), 0); - if (res == -EAGAIN) { - goto repeat_event_recv; - } + sizeof(struct qb_ipc_response_header), -1); if (res == -EINTR) { return -1; } diff --git a/tests/bmcpt.c b/tests/bmcpt.c index e0d4fa0..b7b1369 100644 --- a/tests/bmcpt.c +++ b/tests/bmcpt.c @@ -123,19 +123,14 @@ repeat_send: } }
- repeat_recv: - res = qb_ipcc_recv(ctx->conn, - &res_header, - sizeof(struct qb_ipc_response_header)); - if (res == -EAGAIN) { - goto repeat_recv; - } - if (res == -EINTR) { - return -1; - } - if (res < 0) { - perror("qb_ipcc_recv"); - } + res = qb_ipcc_recv(ctx->conn, &res_header, + sizeof(struct qb_ipc_response_header), -1); + if (res == -EINTR) { + return -1; + } + if (res < 0) { + perror("qb_ipcc_recv"); + } assert(res == sizeof(struct qb_ipc_response_header)); assert(res_header.id == 13); assert(res_header.size == sizeof(struct qb_ipc_response_header)); diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 539e7ee..fcdf91a 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -111,6 +111,7 @@ static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, perror("qb_ipcs_event_send"); } } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { + qb_ipcs_destroy(s1); exit(0); } return 0; @@ -229,13 +230,8 @@ repeat_send: } }
- repeat_recv: - res = qb_ipcc_recv(conn, - &res_header, - sizeof(struct qb_ipc_response_header)); - if (res == -EAGAIN) { - goto repeat_recv; - } + res = qb_ipcc_recv(conn, &res_header, + sizeof(struct qb_ipc_response_header), -1); if (res == -EINTR) { return -1; } @@ -371,7 +367,7 @@ static void test_ipc_dispatch(void) repeat_event_recv: res = qb_ipcc_event_recv(conn, res_header, IPC_BUF_SIZE, 0); if (res < 0) { - if (res == -EAGAIN) { + if (res == -EAGAIN || res == -ETIMEDOUT) { goto repeat_event_recv; } else { errno = -res; @@ -442,11 +438,14 @@ static void test_ipc_server_fail(void) }
/* + * wait a bit for the server to die. + */ + sleep(1); + /* * try recv from the exit'ed server */ - res = qb_ipcc_recv(conn, - &res_header, - sizeof(struct qb_ipc_response_header)); + res = qb_ipcc_recv(conn, &res_header, + sizeof(struct qb_ipc_response_header), 100); /* * confirm we get -ENOTCONN */
Signed-off-by: Angus Salkeld asalkeld@redhat.com --- tests/check_ipc.c | 52 ++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 42 insertions(+), 10 deletions(-)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c index fcdf91a..be8681c 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -203,7 +203,7 @@ static int32_t stop_process(pid_t pid)
#define IPC_BUF_SIZE (1024 * 1024) static char buffer[IPC_BUF_SIZE]; -static int32_t send_and_check(uint32_t size) +static int32_t send_and_check(uint32_t size, int32_t ms_timeout) { struct qb_ipc_request_header *req_header = (struct qb_ipc_request_header *)buffer; struct qb_ipc_response_header res_header; @@ -229,18 +229,22 @@ repeat_send: return res; } } - + repeat_recv: res = qb_ipcc_recv(conn, &res_header, - sizeof(struct qb_ipc_response_header), -1); + sizeof(struct qb_ipc_response_header), ms_timeout); if (res == -EINTR) { return -1; } + if (res == -EAGAIN || res == -ETIMEDOUT) { + goto repeat_recv; + } ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header)); ck_assert_int_eq(res_header.id, IPC_MSG_RES_TX_RX); ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header)); return 0; }
+static int32_t recv_timeout = -1; static void test_ipc_txrx(void) { int32_t j; @@ -268,7 +272,7 @@ static void test_ipc_txrx(void) size *= 2; if (size >= MAX_MSG_SIZE) break; - if (send_and_check(size) < 0) { + if (send_and_check(size, recv_timeout) < 0) { break; } } @@ -279,16 +283,34 @@ static void test_ipc_txrx(void) stop_process(pid); }
-START_TEST(test_ipc_txrx_shm) +START_TEST(test_ipc_txrx_shm_tmo) +{ + ipc_type = QB_IPC_SHM; + recv_timeout = 1000; + test_ipc_txrx(); +} +END_TEST + +START_TEST(test_ipc_txrx_shm_block) { ipc_type = QB_IPC_SHM; + recv_timeout = -1; + test_ipc_txrx(); +} +END_TEST + +START_TEST(test_ipc_txrx_us_block) +{ + ipc_type = QB_IPC_SOCKET; + recv_timeout = -1; test_ipc_txrx(); } END_TEST
-START_TEST(test_ipc_txrx_us) +START_TEST(test_ipc_txrx_us_tmo) { ipc_type = QB_IPC_SOCKET; + recv_timeout = 1000; test_ipc_txrx(); } END_TEST @@ -485,13 +507,23 @@ static Suite *ipc_suite(void) tcase_set_timeout(tc, 6); suite_add_tcase(s, tc);
- tc = tcase_create("ipc_txrx_shm"); - tcase_add_test(tc, test_ipc_txrx_shm); + tc = tcase_create("ipc_txrx_shm_block"); + tcase_add_test(tc, test_ipc_txrx_shm_block); + tcase_set_timeout(tc, 6); + suite_add_tcase(s, tc); + + tc = tcase_create("ipc_txrx_shm_tmo"); + tcase_add_test(tc, test_ipc_txrx_shm_tmo); + tcase_set_timeout(tc, 6); + suite_add_tcase(s, tc); + + tc = tcase_create("ipc_txrx_us_block"); + tcase_add_test(tc, test_ipc_txrx_us_block); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc);
- tc = tcase_create("ipc_txrx_us"); - tcase_add_test(tc, test_ipc_txrx_us); + tc = tcase_create("ipc_txrx_us_tmo"); + tcase_add_test(tc, test_ipc_txrx_us_tmo); tcase_set_timeout(tc, 6); suite_add_tcase(s, tc);
Really handy to see if the peer is "connected".
Signed-off-by: Angus Salkeld asalkeld@redhat.com --- include/qb/qbrb.h | 8 ++++++++ lib/ringbuffer.c | 6 ++++++ 2 files changed, 14 insertions(+), 0 deletions(-)
diff --git a/include/qb/qbrb.h b/include/qb/qbrb.h index b57f2b3..b50359c 100644 --- a/include/qb/qbrb.h +++ b/include/qb/qbrb.h @@ -221,6 +221,14 @@ ssize_t qb_rb_chunk_read(qb_ringbuffer_t * rb, void *data_out, size_t len, int32_t ms_timeout);
/** + * Get the reference count. + * + * @param rb ringbuffer instance + * @return the number of references + */ +int32_t qb_rb_refcount_get(qb_ringbuffer_t * rb); + +/** * The amount of free space in the ring buffer. * * @note Some of this space will be consumed by the chunk headers. diff --git a/lib/ringbuffer.c b/lib/ringbuffer.c index ad3c50d..7524c73 100644 --- a/lib/ringbuffer.c +++ b/lib/ringbuffer.c @@ -214,6 +214,7 @@ void qb_rb_close(qb_ringbuffer_t * rb) "Destroying ringbuffer: %s", rb->shared_hdr->hdr_path);
+ (void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count); (void)rb->sem_destroy_fn(rb); unlink(rb->shared_hdr->data_path); unlink(rb->shared_hdr->hdr_path); @@ -232,6 +233,11 @@ void *qb_rb_shared_user_data_get(qb_ringbuffer_t * rb) return rb->shared_hdr->user_data; }
+int32_t qb_rb_refcount_get(qb_ringbuffer_t * rb) +{ + return qb_atomic_int_get(&rb->shared_hdr->ref_count); +} + ssize_t qb_rb_space_free(qb_ringbuffer_t * rb) { uint32_t write_size;
This makes it possible the detect the loss of the server and return -ENOTCONN.
Signed-off-by: Angus Salkeld asalkeld@redhat.com --- lib/ipc_shm.c | 5 +++++ lib/ipcc.c | 53 +++++++++++++++++++++++++++++++++++++++++++++-------- tests/bmc.c | 2 +- 3 files changed, 51 insertions(+), 9 deletions(-)
diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c index 55670dc..0cf3ecf 100644 --- a/lib/ipc_shm.c +++ b/lib/ipc_shm.c @@ -123,6 +123,11 @@ static void qb_ipc_shm_fc_set(struct qb_ipc_one_way *one_way, static int32_t qb_ipc_shm_fc_get(struct qb_ipc_one_way *one_way) { int32_t *fc; + int32_t rc = qb_rb_refcount_get(one_way->u.shm.rb); + + if (rc != 2) { + return -ENOTCONN; + } fc = qb_rb_shared_user_data_get(one_way->u.shm.rb); return *fc; } diff --git a/lib/ipcc.c b/lib/ipcc.c index 8248cdd..8ef1812 100644 --- a/lib/ipcc.c +++ b/lib/ipcc.c @@ -87,8 +87,17 @@ ssize_t qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr, if (msg_len > c->request.max_msg_size) { return -EINVAL; } - if (c->funcs.fc_get && c->funcs.fc_get(&c->request)) { - return -EAGAIN; + if (c->funcs.fc_get) { + res = c->funcs.fc_get(&c->request); + if (res < 0) { + return res; + } else if (res > 0) { + return -EAGAIN; + } else { + /* + * we can transmit + */ + } }
res = c->funcs.send(&c->request, msg_ptr, msg_len); @@ -96,6 +105,9 @@ ssize_t qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr, do { res2 = qb_ipc_us_send(&c->setup, msg_ptr, 1); } while (res2 == -EAGAIN); + if (res2 == -EPIPE) { + return -ENOTCONN; + } if (res2 != 1) { res = res2; } @@ -109,6 +121,7 @@ ssize_t qb_ipcc_sendv(struct qb_ipcc_connection* c, const struct iovec* iov, int32_t total_size = 0; int32_t i; int32_t res; + int32_t res2;
for (i = 0; i < iov_len; i++) { total_size += iov[i].iov_len; @@ -117,15 +130,30 @@ ssize_t qb_ipcc_sendv(struct qb_ipcc_connection* c, const struct iovec* iov, return -EINVAL; }
- if (c->funcs.fc_get && c->funcs.fc_get(&c->request)) { - return -EAGAIN; + if (c->funcs.fc_get) { + res = c->funcs.fc_get(&c->request); + if (res < 0) { + return res; + } else if (res > 0) { + return -EAGAIN; + } else { + /* + * we can transmit + */ + } }
res = c->funcs.sendv(&c->request, iov, iov_len); if (res > 0 && c->needs_sock_for_poll) { do { - res = qb_ipc_us_send(&c->setup, &res, 1); - } while (res == -EAGAIN); + res2 = qb_ipc_us_send(&c->setup, &res, 1); + } while (res2 == -EAGAIN); + if (res2 == -EPIPE) { + return -ENOTCONN; + } + if (res2 != 1) { + res = res2; + } } return res; } @@ -155,8 +183,17 @@ ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c, { ssize_t res = 0;
- if (c->funcs.fc_get && c->funcs.fc_get(&c->request)) { - return -EAGAIN; + if (c->funcs.fc_get) { + res = c->funcs.fc_get(&c->request); + if (res < 0) { + return res; + } else if (res > 0) { + return -EAGAIN; + } else { + /* + * we can transmit + */ + } }
res = qb_ipcc_sendv(c, iov, iov_len); diff --git a/tests/bmc.c b/tests/bmc.c index e49d0e6..85db32d 100644 --- a/tests/bmc.c +++ b/tests/bmc.c @@ -88,7 +88,7 @@ repeat_send: if (res < 0) { if (res == -EAGAIN) { goto repeat_send; - } else if (res == -EINVAL || res == -EINTR) { + } else if (res == -EINVAL || res == -EINTR || res == -ENOTCONN) { perror("qb_ipcc_send"); return -1; } else {
quarterback-devel@lists.fedorahosted.org