Note: pmq needs "sudo make check"
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcc.h | 2 +-
lib/ipc_posix_mq.c | 77 +++++++++++++++++++++++++++++---------------------
lib/ipc_sysv_mq.c | 45 ++++++++++++++++-------------
lib/ipc_us.c | 23 ++++++++++++--
lib/ipcc.c | 4 +-
lib/ipcs.c | 8 ++++-
tests/bmc.c | 4 +--
tests/bmcpt.c | 2 +-
tests/check_ipc.c | 76 +++++++++++++++++++++++++++++++++++++------------
9 files changed, 157 insertions(+), 84 deletions(-)
diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index 6f16fb4..b02c8c6 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -37,7 +37,7 @@ extern "C" {
typedef struct qb_ipcc_connection qb_ipcc_connection_t;
qb_ipcc_connection_t*
-qb_ipcc_connect(const char *name, enum qb_ipc_type type);
+qb_ipcc_connect(const char *name);
int32_t qb_ipcc_send(qb_ipcc_connection_t* c, const void *msg_ptr,
size_t msg_len);
diff --git a/lib/ipc_posix_mq.c b/lib/ipc_posix_mq.c
index ef3fa60..57f7350 100644
--- a/lib/ipc_posix_mq.c
+++ b/lib/ipc_posix_mq.c
@@ -39,29 +39,42 @@ static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t
q_len)
int32_t msgsize_max;
char size_str[10];
int32_t res = 0;
+ int32_t size = 0;
#ifdef QB_LINUX
struct rlimit rlim;
int32_t q_limit;
#endif /* QB_LINUX */
proc_fd = fopen("/proc/sys/fs/mqueue/msgsize_max", "r+");
- if (proc_fd > 0) {
- res = fscanf(proc_fd, "%d", &msgsize_max);
- } else {
+ if (proc_fd == NULL) {
res = -errno;
- qb_util_log(LOG_ERR, "fopen failed");
+ qb_util_log(LOG_ERR,
+ "failed to open \"%s\": %s",
+ "/proc/sys/fs/mqueue/msgsize_max",
+ strerror(errno));
+ }
+
+ if (res == 0) {
+ res = fscanf(proc_fd, "%d", &msgsize_max);
+ if (res < 0) {
+ res = -errno;
+ qb_util_log(LOG_ERR, "fscanf failed: %s", strerror(errno));
+ }
}
+
if (res == 1) {
if (msgsize_max <= max_msg_size) {
/* we need to increase the size */
- snprintf(size_str, 10, "%zd", (max_msg_size + 1));
- fwrite(size_str, 1, strlen(size_str), proc_fd);
+ res = snprintf(size_str, 10, "%zd", (max_msg_size + 1));
+ size = fwrite(size_str, 1, strlen(size_str), proc_fd);
+ if (res != size) {
+ res = -errno;
+ }
}
- } else {
- qb_util_log(LOG_ERR, "fscanf failed");
- return res;
}
- fclose(proc_fd);
+ if (proc_fd) {
+ fclose(proc_fd);
+ }
#ifdef QB_LINUX
if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
@@ -94,15 +107,19 @@ static mqd_t posix_mq_create(const char *mq_name, size_t
max_msg_size,
attr.mq_maxmsg = q_len;
attr.mq_msgsize = max_msg_size;
- mq_unlink(mq_name);
+ if (mq_unlink(mq_name) == -1) {
+ if (errno == EACCES) {
+ qb_util_log(LOG_ERR, "Can't remove old mq \"%s\" : %s",
+ mq_name, strerror(errno));
+ return -1;
+ }
+ }
res = mq_open(mq_name, flags, m, &attr);
if (res == (mqd_t)-1) {
- perror(mq_name);
+ qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s",
+ mq_name, strerror(errno));
}
- printf("%s(%s, %zd, %d) == %d\n",
- __func__, mq_name, max_msg_size, flags, res);
-
return res;
}
@@ -116,10 +133,10 @@ static int32_t qb_ipcc_pmq_send(struct qb_ipcc_connection *c,
const void *msg_ptr, size_t msg_len)
{
int32_t res = mq_send(c->u.pmq.request.q, msg_ptr, msg_len, 1);
- if (res < 0) {
+ if (res != 0) {
return -errno;
}
- return 0;
+ return msg_len;
}
static ssize_t qb_ipcc_pmq_recv(struct qb_ipcc_connection *c,
@@ -131,14 +148,14 @@ static ssize_t qb_ipcc_pmq_recv(struct qb_ipcc_connection *c,
if (res < 0) {
return -errno;
}
- return 0;
+ return res;
}
static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_request_header hdr;
- printf("%s()\n", __func__);
+ qb_util_log(LOG_DEBUG, "%s()\n", __func__);
if (c->needs_sock_for_poll) {
return;
}
@@ -180,8 +197,9 @@ static int32_t _ipcc_pmq_connect_to_service_(struct qb_ipcc_connection
*c)
perror("mq_send");
return res;
}
- printf("sent request to server %d\n", res);
- printf("mq_receive'ing on %d\n", c->u.pmq.response.q);
+
+ qb_util_log(LOG_DEBUG, "sent request to server %d\n", res);
+ qb_util_log(LOG_DEBUG, "mq_receive'ing on %d\n", c->u.pmq.response.q);
mq_recv_again:
size = mq_receive(c->u.pmq.response.q, c->receive_buf,
@@ -196,7 +214,7 @@ mq_recv_again:
perror("_ipcc_pmq_connect_to_service_:mq_receive");
goto cleanup;
}
- printf("received response from server %zd\n", size);
+ qb_util_log(LOG_DEBUG, "received response from server %zd\n", size);
msg_res = (struct mar_res_setup *)c->receive_buf;
res = msg_res->hdr.error;
if (res == 0) {
@@ -274,8 +292,6 @@ int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection * c)
return 0;
}
- printf("%s:%d\n", __FILE__, __LINE__);
-
mq_close(c->u.pmq.dispatch.q);
mq_unlink(c->u.pmq.dispatch.name);
@@ -311,8 +327,11 @@ static void qb_ipcs_pmq_destroy(struct qb_ipcs_service *s)
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
+ char mq_name[NAME_MAX];
+
+ snprintf(mq_name, NAME_MAX, "/%s", s->name);
- printf("%s\n", __func__);
+ qb_util_log(LOG_DEBUG, "%s\n", __func__);
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
@@ -328,7 +347,7 @@ static void qb_ipcs_pmq_destroy(struct qb_ipcs_service *s)
if (mq_close(s->u.q) == -1)
perror("mq_close");
- if (mq_unlink(s->name) == -1)
+ if (mq_unlink(mq_name) == -1)
perror("mq_unlink");
}
@@ -351,23 +370,18 @@ static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s,
O_WRONLY | O_NONBLOCK);
if (c->u.pmq.response.q == (mqd_t)-1) {
res = -errno;
- perror("mq_open:RESPONSE");
return res;
}
- qb_util_log(LOG_DEBUG, "%s:%s (fd==%d)",
- __func__, c->u.pmq.response.name, c->u.pmq.response.q);
/* setup the dispatch message queue
*/
posix_mq_increase_limits(c->service->max_msg_size, 10);
strcpy(c->u.pmq.dispatch.name, init->dispatch_mq);
- qb_util_log(LOG_DEBUG, "%s:%s", __func__, c->u.pmq.dispatch.name);
c->u.pmq.dispatch.q = mq_open(c->u.pmq.dispatch.name,
O_WRONLY | O_NONBLOCK);
if (c->u.pmq.dispatch.q == (mqd_t)-1) {
res = -errno;
- perror("mq_open:DISPATCH");
goto cleanup_response;
}
@@ -457,7 +471,6 @@ int32_t qb_ipcs_pmq_create(struct qb_ipcs_service * s)
(O_RDONLY | O_CREAT | O_EXCL | O_NONBLOCK));
if (s->u.q == (mqd_t)-1) {
res = -errno;
- perror("posix_mq_create:REQUEST");
return res;
}
qb_util_log(LOG_DEBUG, "%s() %d", __func__, s->u.q);
diff --git a/lib/ipc_sysv_mq.c b/lib/ipc_sysv_mq.c
index 30a8c27..7bdb94d 100644
--- a/lib/ipc_sysv_mq.c
+++ b/lib/ipc_sysv_mq.c
@@ -37,14 +37,13 @@ static ssize_t qb_ipcs_smq_dispatch_send(struct qb_ipcs_connection
*c,
* utility functions
* --------------------------------------------------------
*/
-static int32_t sysv_mq_create(struct qb_ipcs_service *s,
- struct qb_ipcc_smq_one_way *mq)
+static int32_t sysv_mq_create(struct qb_ipcs_service *s)
{
struct msqid_ds info;
int32_t res = 0;
- mq->q = msgget(mq->key, IPC_CREAT | IPC_NOWAIT);
- if (mq->q == -1) {
+ s->u.smq.q = msgget(s->u.smq.key, IPC_CREAT | O_EXCL | IPC_NOWAIT);
+ if (s->u.smq.q == -1) {
res = -errno;
perror("msgget:REQUEST");
return res;
@@ -99,22 +98,27 @@ static key_t sysv_key_from_name(const char *name)
static int32_t qb_ipcc_smq_send(struct qb_ipcc_connection *c,
const void *msg_ptr, size_t msg_len)
{
- //printf("%s()\n", __func__);
- return msgsnd(c->u.smq.request.q, msg_ptr, msg_len, 0);
+ int32_t res = msgsnd(c->u.smq.request.q, msg_ptr, msg_len, 0);
+ if (res == -1) {
+ return -errno;
+ }
+ return msg_len;
}
static ssize_t qb_ipcc_smq_recv(struct qb_ipcc_connection *c,
const void *msg_ptr, size_t msg_len)
{
- int32_t res;
+ ssize_t res;
res = msgrcv(c->u.smq.response.q, (char *)msg_ptr,
c->max_msg_size, 0, IPC_NOWAIT);
- //printf("%s() %d\n", __func__, res);
if (res == -1 && errno == ENOMSG) {
/* just to be consistent with other IPC types.
*/
- errno = EAGAIN;
+ return -EAGAIN;
+ }
+ if (res == -1) {
+ return -errno;
}
return res;
}
@@ -123,7 +127,7 @@ static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c)
{
struct qb_ipc_request_header hdr;
- printf("%s()\n", __func__);
+ qb_util_log(LOG_DEBUG, "%s()\n", __func__);
if (c->needs_sock_for_poll) {
return;
}
@@ -141,6 +145,7 @@ static int32_t _smq_connect_to_service_(struct qb_ipcc_connection *c)
{
int32_t res;
ssize_t size;
+ int32_t waited;
struct mar_req_smq_setup start;
struct mar_res_setup *msg_res;
@@ -162,22 +167,24 @@ static int32_t _smq_connect_to_service_(struct qb_ipcc_connection
*c)
perror("msgsnd");
return res;
}
- printf("sent request to server %d\n", res);
+ qb_util_log(LOG_DEBUG, "sent request to server %d\n", res);
+ waited = 0;
mq_recv_again:
size = msgrcv(c->u.smq.response.q, c->receive_buf,
c->max_msg_size, 0, IPC_NOWAIT);
- if (size == -1 && (errno == EAGAIN || errno == ENOMSG)) {
+ if (size == -1 && (errno == EAGAIN || errno == ENOMSG) && waited <
10) {
usleep(100000);
+ waited++;
goto mq_recv_again;
}
if (size == -1) {
- res = errno;
+ res = -errno;
perror("msgrcv");
goto cleanup;
}
- printf("received response from server %zd\n", size);
+ qb_util_log(LOG_DEBUG, "received response from server %zd\n", size);
msg_res = (struct mar_res_setup *)c->receive_buf;
res = msg_res->hdr.error;
if (res == 0) {
@@ -242,8 +249,6 @@ int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection * c)
return 0;
}
- printf("%s:%d\n", __FILE__, __LINE__);
-
msgctl(c->u.smq.dispatch.q, IPC_RMID, NULL);
cleanup_request_response:
@@ -280,7 +285,7 @@ static void qb_ipcs_smq_destroy(struct qb_ipcs_service *s)
struct qb_list_head *iter;
struct qb_list_head *iter_next;
- printf("%s\n", __func__);
+ qb_util_log(LOG_DEBUG, "%s\n", __func__);
for (iter = s->connections.next;
iter != &s->connections; iter = iter_next) {
@@ -392,7 +397,7 @@ static ssize_t qb_ipcs_smq_response_send(struct qb_ipcs_connection
*c,
if (res == -1) {
return -errno;
}
- return res;
+ return size;
}
static ssize_t qb_ipcs_smq_dispatch_send(struct qb_ipcs_connection *c,
@@ -402,7 +407,7 @@ static ssize_t qb_ipcs_smq_dispatch_send(struct qb_ipcs_connection
*c,
if (res == -1) {
return -errno;
}
- return res;
+ return size;
}
int32_t qb_ipcs_smq_create(struct qb_ipcs_service * s)
@@ -424,5 +429,5 @@ int32_t qb_ipcs_smq_create(struct qb_ipcs_service * s)
s->max_msg_size = MSGMAX;
- return sysv_mq_create(s, &s->u.smq);
+ return sysv_mq_create(s);
}
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index f3d83d0..1e432c5 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -292,7 +292,7 @@ int32_t qb_ipcc_us_connect(const char *socket_name, int32_t *
sock_pt)
}
#ifdef SO_NOSIGPIPE
socket_nosigpipe(request_fd);
-#endif
+#endif /* SO_NOSIGPIPE */
memset(&address, 0, sizeof(struct sockaddr_un));
address.sun_family = AF_UNIX;
@@ -479,7 +479,12 @@ int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s)
#if !defined(QB_LINUX)
res = chmod(un_addr.sun_path, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
- listen(s->server_sock, SERVER_BACKLOG);
+ if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
+ strerror_r(errno, error_str, 100);
+ qb_util_log(LOG_ERR,
+ "listen failed: %s.\n",
+ error_str);
+ }
qb_poll_dispatch_add(s->poll_handle, s->server_sock,
POLLIN | POLLPRI | POLLNVAL,
@@ -493,6 +498,8 @@ error_close:
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
+ qb_util_log(LOG_INFO,
+ "withdrawing server sockets\n");
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
return 0;
@@ -511,16 +518,24 @@ static int32_t qb_ipcs_us_connection_acceptor(qb_handle_t handle,
char error_str[100];
retry_accept:
+ errno = 0;
new_fd = accept(fd, (struct sockaddr *)&un_addr, &addrlen);
if (new_fd == -1 && errno == EINTR) {
goto retry_accept;
}
+ if (new_fd == -1 && errno == EBADF) {
+ strerror_r(errno, error_str, 100);
+ qb_util_log(LOG_ERR,
+ "Could not accept Library connection:(fd: %d) [%d] %s\n",
+ fd, errno, error_str);
+ return (-1);
+ }
if (new_fd == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
- "Could not accept Library connection: %s\n",
- error_str);
+ "Could not accept Library connection: [%d] %s\n",
+ errno, error_str);
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
}
diff --git a/lib/ipcc.c b/lib/ipcc.c
index 866d285..fab9d49 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -25,7 +25,7 @@
#include "util_int.h"
#include <qb/qbipcc.h>
-qb_ipcc_connection_t *qb_ipcc_connect(const char *name, enum qb_ipc_type type)
+qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
{
int32_t res;
int32_t usock;
@@ -36,6 +36,7 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name, enum qb_ipc_type
type)
res = qb_ipcc_us_connect(name, &usock);
if (res != 0) {
errno = -res;
+ perror("qb_ipcc_us_connect");
return NULL;
}
@@ -43,7 +44,6 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name, enum qb_ipc_type
type)
init_req.hdr.size = sizeof(init_req);
res = qb_ipc_us_send(usock, &init_req, init_req.hdr.size);
if (res < 0) {
- errno = -res;
perror("qb_ipc_us_send");
qb_ipcc_us_disconnect(usock);
errno = -res;
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 16d6847..a7db325 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -58,6 +58,7 @@ qb_ipcs_service_pt qb_ipcs_create(const char *name, enum qb_ipc_type
type,
default:
qb_hdb_handle_destroy(&qb_ipc_services, h);
errno = EINVAL;
+ h = 0;
break;
}
qb_hdb_handle_put(&qb_ipc_services, h);
@@ -103,8 +104,6 @@ int32_t qb_ipcs_run(qb_ipcs_service_pt pt, qb_handle_t poll_handle)
s->poll_handle = poll_handle;
res = qb_ipcs_us_publish(s);
- qb_util_log(LOG_INFO, "%d", res);
-
if (res < 0) {
qb_hdb_handle_put(&qb_ipc_services, pt);
return res;
@@ -127,6 +126,11 @@ int32_t qb_ipcs_run(qb_ipcs_service_pt pt, qb_handle_t poll_handle)
res = -EINVAL;
break;
}
+
+ if (res < 0) {
+ qb_ipcs_us_withdraw(s);
+ }
+
qb_hdb_handle_put(&qb_ipc_services, pt);
return res;
}
diff --git a/tests/bmc.c b/tests/bmc.c
index eff8a86..340f9c9 100644
--- a/tests/bmc.c
+++ b/tests/bmc.c
@@ -182,9 +182,7 @@ int32_t main(int32_t argc, char *argv[])
signal(SIGINT, sigterm_handler);
signal(SIGILL, sigterm_handler);
signal(SIGTERM, sigterm_handler);
- conn = qb_ipcc_connect("bm1", QB_IPC_SHM);
-// conn = qb_ipcc_connect("bm1", QB_IPC_POSIX_MQ);
-// conn = qb_ipcc_connect("bm1", QB_IPC_SYSV_MQ);
+ conn = qb_ipcc_connect("bm1");
if (conn == NULL) {
perror("qb_ipcc_connect");
exit(1);
diff --git a/tests/bmcpt.c b/tests/bmcpt.c
index d1029dd..4a07b55 100644
--- a/tests/bmcpt.c
+++ b/tests/bmcpt.c
@@ -82,7 +82,7 @@ static void bm_finish(struct bm_ctx *ctx, const char *operation, int32_t
size)
static void bmc_connect(struct bm_ctx *ctx)
{
- ctx->conn = qb_ipcc_connect("bm1", QB_IPC_SHM);
+ ctx->conn = qb_ipcc_connect("bm1");
}
static void bmc_disconnect(struct bm_ctx *ctx)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index bc75d44..2965104 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -34,8 +34,10 @@
#include <qb/qbipcs.h>
#include <qb/qbpoll.h>
+#define IPC_NAME "ipc_test"
#define MAX_MSG_SIZE (8192*16)
static qb_ipcc_connection_t *conn;
+static enum qb_ipc_type ipc_type;
/* Test Cases
*
@@ -91,6 +93,8 @@ static void ipc_log_fn(const char *file_name,
static void run_ipc_server(void)
{
+ int32_t res;
+
struct qb_ipcs_service_handlers sh = {
.connection_authenticate = NULL,
.connection_created = NULL,
@@ -98,17 +102,17 @@ static void run_ipc_server(void)
.connection_destroyed = NULL,
};
signal(SIGTERM, sigterm_handler);
- qb_util_set_log_function(ipc_log_fn);
bms_poll_handle = qb_poll_create();
- s1 = qb_ipcs_create("bm1", QB_IPC_SHM, MAX_MSG_SIZE);
- if (s1 == 0) {
- perror("qb_ipcs_create");
- exit(1);
- }
+ s1 = qb_ipcs_create(IPC_NAME, ipc_type, MAX_MSG_SIZE);
+ fail_if(s1 == 0);
+
qb_ipcs_service_handlers_set(s1, &sh);
- qb_ipcs_run(s1, bms_poll_handle);
+
+ res = qb_ipcs_run(s1, bms_poll_handle);
+ ck_assert_int_eq(res, 0);
+
qb_poll_run(bms_poll_handle);
}
@@ -177,19 +181,26 @@ repeat_send:
return 0;
}
-START_TEST(test_ipc_txrx_shm)
+
+static void test_ipc_txrx(void)
{
int32_t j;
+ int32_t c = 0;
size_t size;
- pid_t pid = run_function_in_new_process(run_ipc_server);
+ pid_t pid;
+
+ pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
do {
- conn = qb_ipcc_connect("bm1", QB_IPC_SHM);
- if (conn == NULL) usleep(10000);
- } while (conn == NULL);
-
+ conn = qb_ipcc_connect(IPC_NAME);
+ if (conn == NULL) {
+ sleep(1);
+ c++;
+ }
+ } while (conn == NULL && c < 5);
fail_if(conn == NULL);
+
for (j = 1; j < 19; j++) {
size = (10 * j * j * j) + sizeof(struct qb_ipc_request_header);
if (size >= MAX_MSG_SIZE)
@@ -198,21 +209,48 @@ START_TEST(test_ipc_txrx_shm)
break;
}
}
-
qb_ipcc_disconnect(conn);
-
stop_process(pid);
}
+
+START_TEST(test_ipc_txrx_shm)
+{
+ ipc_type = QB_IPC_SHM;
+ test_ipc_txrx();
+}
+END_TEST
+
+START_TEST(test_ipc_txrx_pmq)
+{
+ ipc_type = QB_IPC_POSIX_MQ;
+ test_ipc_txrx();
+}
+END_TEST
+
+START_TEST(test_ipc_txrx_smq)
+{
+ ipc_type = QB_IPC_SYSV_MQ;
+ test_ipc_txrx();
+}
END_TEST
static Suite *ipc_suite(void)
{
- TCase *tc_load;
+ TCase *tc;
Suite *s = suite_create("ipc");
- tc_load = tcase_create("test01");
- tcase_add_test(tc_load, test_ipc_txrx_shm);
- suite_add_tcase(s, tc_load);
+ tc = tcase_create("ipc_txrx_shm");
+ tcase_add_test(tc, test_ipc_txrx_shm);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("ipc_txrx_posix_mq");
+ tcase_add_test(tc, test_ipc_txrx_pmq);
+ tcase_set_timeout(tc, 10);
+ suite_add_tcase(s, tc);
+
+// tc = tcase_create("ipc_txrx_sysv_mq");
+// tcase_add_test(tc, test_ipc_txrx_smq);
+// suite_add_tcase(s, tc);
return s;
}
--
1.7.2.2