Create the queues/ringbuffers on the server so we don't
have to modify proc entries. Then chown them so that
the clients can access them.
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipc_common.h | 1 -
include/qb/qbipcc.h | 2 +-
include/qb/qbipcs.h | 3 +-
lib/ipc_int.h | 154 ++++++------------
lib/ipc_posix_mq.c | 390 +++++++++++++++++----------------------------
lib/ipc_shm.c | 251 ++++++++++--------------------
lib/ipc_sysv_mq.c | 298 +++++++++++------------------------
lib/ipc_us.c | 84 +++++-----
lib/ipcc.c | 47 +++---
lib/ipcs.c | 61 ++-----
lib/util.c | 6 +
tests/bmc.c | 7 +-
tests/bmcpt.c | 4 +-
tests/bms.c | 2 +-
14 files changed, 472 insertions(+), 838 deletions(-)
diff --git a/include/qb/qbipc_common.h b/include/qb/qbipc_common.h
index b9abf4e..0cae82c 100644
--- a/include/qb/qbipc_common.h
+++ b/include/qb/qbipc_common.h
@@ -26,7 +26,6 @@
struct qb_ipc_request_header {
int32_t id __attribute__ ((aligned(8)));
int32_t size __attribute__ ((aligned(8)));
- uint64_t session_id __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
struct qb_ipc_response_header {
diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index 2a1eae1..e1cda6d 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -37,7 +37,7 @@ extern "C" {
typedef struct qb_ipcc_connection qb_ipcc_connection_t;
qb_ipcc_connection_t*
-qb_ipcc_connect(const char *name);
+qb_ipcc_connect(const char *name, size_t max_msg_size);
void qb_ipcc_disconnect(qb_ipcc_connection_t* c);
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 996c914..d7dff7d 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -63,8 +63,7 @@ struct qb_ipcs_service_handlers {
};
qb_ipcs_service_pt qb_ipcs_create(const char* name,
- enum qb_ipc_type type,
- size_t max_msg_size);
+ enum qb_ipc_type type);
void qb_ipcs_service_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_service_handlers *handlers);
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index b43f7a5..131232a 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -49,6 +49,30 @@
#include <semaphore.h>
#endif
+/*
+Client Server
+SEND CONN REQ ->
+ ACCEPT & CREATE queues
+ or DENY
+ <- SEND ACCEPT(with details)/DENY
+*/
+
+struct qb_ipc_connection_request {
+ struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
+ uint32_t max_msg_size __attribute__ ((aligned(8)));
+} __attribute__ ((aligned(8)));
+
+struct qb_ipc_connection_response {
+ struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
+ int32_t connection_type __attribute__ ((aligned(8)));
+ uint32_t max_msg_size __attribute__ ((aligned(8)));
+ char request[PATH_MAX] __attribute__ ((aligned(8)));
+ char response[PATH_MAX] __attribute__ ((aligned(8)));
+ char event[PATH_MAX] __attribute__ ((aligned(8)));
+} __attribute__ ((aligned(8)));
+
+
+
struct qb_ipcc_connection;
struct qb_ipcc_funcs {
@@ -62,52 +86,31 @@ struct qb_ipcc_funcs {
void (*disconnect)(struct qb_ipcc_connection* c);
};
-struct qb_ipcc_pmq_one_way {
- mqd_t q;
- char name[NAME_MAX];
-};
-
-struct qb_ipcc_smq_one_way {
- int32_t q;
- int32_t key;
-};
-
-struct qb_ipcc_shm_one_way {
- qb_ringbuffer_t *rb;
- char name[NAME_MAX];
-};
-
-struct qb_ipcc_pmq_connection {
- struct qb_ipcc_pmq_one_way request;
- struct qb_ipcc_pmq_one_way response;
- struct qb_ipcc_pmq_one_way event;
-};
-
-struct qb_ipcc_smq_connection {
- struct qb_ipcc_smq_one_way request;
- struct qb_ipcc_smq_one_way response;
- struct qb_ipcc_smq_one_way event;
-};
-
-struct qb_ipcc_shm_connection {
- struct qb_ipcc_shm_one_way request;
- struct qb_ipcc_shm_one_way response;
- struct qb_ipcc_shm_one_way event;
+union qb_ipc_one_way {
+ struct {
+ mqd_t q;
+ char name[NAME_MAX];
+ } pmq;
+ struct {
+ int32_t q;
+ int32_t key;
+ } smq;
+ struct {
+ qb_ringbuffer_t *rb;
+ char name[NAME_MAX];
+ } shm;
};
struct qb_ipcc_connection {
- enum qb_ipc_type type;
char name[NAME_MAX];
- uint64_t session_id;
+ enum qb_ipc_type type;
+ size_t max_msg_size;
int32_t needs_sock_for_poll;
int32_t sock;
- union {
- struct qb_ipcc_pmq_connection pmq;
- struct qb_ipcc_smq_connection smq;
- struct qb_ipcc_shm_connection shm;
- } u;
+ union qb_ipc_one_way request;
+ union qb_ipc_one_way response;
+ union qb_ipc_one_way event;
struct qb_ipcc_funcs funcs;
- size_t max_msg_size;
char *receive_buf;
};
@@ -120,10 +123,10 @@ int32_t qb_ipcc_us_connect(const char *socket_name, int32_t
*sock_pt);
void qb_ipcc_us_disconnect (int32_t sock);
-int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c);
-int32_t qb_ipcc_soc_connect(struct qb_ipcc_connection *c);
-int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c);
-int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c);
+int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c, struct
qb_ipc_connection_response * response);
+int32_t qb_ipcc_soc_connect(struct qb_ipcc_connection *c, struct
qb_ipc_connection_response * response);
+int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c, struct
qb_ipc_connection_response * response);
+int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c, struct
qb_ipc_connection_response * response);
struct qb_ipcs_service;
struct qb_ipcs_connection;
@@ -131,9 +134,9 @@ struct qb_ipcs_connection;
struct qb_ipcs_funcs {
void (*destroy)(struct qb_ipcs_service *s);
int32_t (*connect)(struct qb_ipcs_service *s, struct qb_ipcs_connection *c,
- void *data, size_t size);
+ struct qb_ipc_connection_response *r);
void (*disconnect)(struct qb_ipcs_connection *c);
- ssize_t (*request_recv)(struct qb_ipcs_service *s, void *buf, size_t buf_size);
+ ssize_t (*request_recv)(struct qb_ipcs_connection *c, void *buf, size_t buf_size);
ssize_t (*response_send)(struct qb_ipcs_connection *c, void *data, size_t size);
ssize_t (*event_send)(struct qb_ipcs_connection *c, void *data, size_t size);
};
@@ -151,13 +154,6 @@ struct qb_ipcs_service {
struct qb_ipcs_funcs funcs;
struct qb_list_head connections;
- union {
- mqd_t q;
- qb_ringbuffer_t *rb;
- struct qb_ipcc_smq_one_way smq;
- } u;
- size_t max_msg_size;
- char *receive_buf;
};
struct qb_ipcs_connection {
@@ -166,13 +162,13 @@ struct qb_ipcs_connection {
uid_t euid;
gid_t egid;
int32_t sock;
- union {
- struct qb_ipcc_pmq_connection pmq;
- struct qb_ipcc_smq_connection smq;
- struct qb_ipcc_shm_connection shm;
- } u;
+ size_t max_msg_size;
+ union qb_ipc_one_way request;
+ union qb_ipc_one_way response;
+ union qb_ipc_one_way event;
struct qb_ipcs_service *service;
struct qb_list_head list;
+ char *receive_buf;
};
int32_t qb_ipcs_pmq_create(struct qb_ipcs_service *s);
@@ -194,48 +190,4 @@ int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
void qb_ipcs_disconnect(struct qb_ipcs_connection *c);
-struct mar_req_initial_setup {
- struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
-} __attribute__ ((aligned(8)));
-
-struct mar_res_initial_setup {
- struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
- int32_t connection_type __attribute__ ((aligned(8)));
- uint64_t session_id __attribute__ ((aligned(8)));
- uint32_t max_msg_size __attribute__ ((aligned(8)));
-} __attribute__ ((aligned(8)));
-
-struct mar_req_shm_setup {
- struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
- uint32_t pid __attribute__ ((aligned(8)));
- char request[PATH_MAX] __attribute__ ((aligned(8)));
- char response[PATH_MAX] __attribute__ ((aligned(8)));
- char event[PATH_MAX] __attribute__ ((aligned(8)));
-} __attribute__ ((aligned(8)));
-
-struct mar_req_pmq_setup {
- struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
- uint32_t pid __attribute__ ((aligned(8)));
- char response_mq[NAME_MAX] __attribute__ ((aligned(8)));
- char event_mq[NAME_MAX] __attribute__ ((aligned(8)));
-} __attribute__ ((aligned(8)));
-
-struct mar_req_smq_setup {
- struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
- uint32_t pid __attribute__ ((aligned(8)));
- int32_t response_key __attribute__ ((aligned(8)));
- int32_t event_key __attribute__ ((aligned(8)));
-} __attribute__ ((aligned(8)));
-
-struct mar_res_setup {
- struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
- size_t max_msg_size __attribute__ ((aligned(8)));
-} __attribute__ ((aligned(8)));
-
-typedef struct {
- uid_t euid __attribute__ ((aligned(8)));
- gid_t egid __attribute__ ((aligned(8)));
-} mar_req_priv_change __attribute__ ((aligned(8)));
-
-
#endif /* QB_IPC_INT_H_DEFINED */
diff --git a/lib/ipc_posix_mq.c b/lib/ipc_posix_mq.c
index ff8c0ed..3c7f08d 100644
--- a/lib/ipc_posix_mq.c
+++ b/lib/ipc_posix_mq.c
@@ -27,7 +27,16 @@
#include <qb/qbpoll.h>
static ssize_t qb_ipcs_pmq_event_send(struct qb_ipcs_connection *c,
- void *data, size_t size);
+ void *data, size_t size);
+
+#define QB_REQUEST_Q_LEN 3
+#define QB_RESPONSE_Q_LEN 1
+#define QB_EVENT_Q_LEN 3
+
+static size_t q_space_used = 0;
+#ifdef QB_LINUX
+#define QB_RLIMIT_CHANGE_NEEDED 1
+#endif /* QB_LINUX */
/*
* utility functions
@@ -35,95 +44,109 @@ static ssize_t qb_ipcs_pmq_event_send(struct qb_ipcs_connection *c,
*/
static int32_t posix_mq_increase_limits(size_t max_msg_size, int32_t q_len)
{
- FILE *proc_fd;
- int32_t msgsize_max;
- char size_str[10];
int32_t res = 0;
- int32_t size = 0;
-#ifdef QB_LINUX
+#ifdef QB_RLIMIT_CHANGE_NEEDED
struct rlimit rlim;
- int32_t q_limit;
-#endif /* QB_LINUX */
+ size_t q_space_needed;
+#endif /* QB_RLIMIT_CHANGE_NEEDED */
- proc_fd = fopen("/proc/sys/fs/mqueue/msgsize_max", "r+");
- if (proc_fd == NULL) {
+#ifdef QB_RLIMIT_CHANGE_NEEDED
+ if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno;
- qb_util_log(LOG_ERR,
- "failed to open \"%s\": %s",
- "/proc/sys/fs/mqueue/msgsize_max",
- strerror(errno));
+ qb_util_log(LOG_ERR, "getrlimit failed");
+ return res;
}
+ q_space_needed = q_space_used + (max_msg_size * q_len * 4 / 3);
- if (res == 0) {
- res = fscanf(proc_fd, "%d", &msgsize_max);
- if (res < 0) {
- res = -errno;
- qb_util_log(LOG_ERR, "fscanf failed: %s", strerror(errno));
- }
- }
+ qb_util_log(LOG_DEBUG, "rlimit:%d needed:%zu used:%zu",
+ (int)rlim.rlim_cur, q_space_needed, q_space_used);
- if (res == 1) {
- if (msgsize_max <= max_msg_size) {
- /* we need to increase the size */
- res = snprintf(size_str, 10, "%zd", (max_msg_size + 1));
- size = fwrite(size_str, 1, strlen(size_str), proc_fd);
- if (res != size) {
- res = -errno;
- }
- }
+ if (rlim.rlim_cur < q_space_needed) {
+ rlim.rlim_cur = q_space_needed;
}
- if (proc_fd) {
- fclose(proc_fd);
+ if (rlim.rlim_max < q_space_needed) {
+ rlim.rlim_max = q_space_needed;
}
-
-#ifdef QB_LINUX
- if (getrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
+ if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
res = -errno;
- qb_util_log(LOG_ERR, "getrlimit failed");
+ qb_util_log(LOG_ERR, "setrlimit failed");
+ }
+#endif /* QB_RLIMIT_CHANGE_NEEDED */
+ return res;
+}
+
+static int32_t posix_mq_open(struct qb_ipcc_connection *c,
+ union qb_ipc_one_way *one_way,
+ const char *name, size_t q_len)
+{
+ int32_t res = posix_mq_increase_limits(c->max_msg_size, q_len);
+ if (res != 0) {
return res;
}
- q_limit = (max_msg_size * q_len * 4) / 3;
- rlim.rlim_cur += q_limit;
- rlim.rlim_max += q_limit;
- if (setrlimit(RLIMIT_MSGQUEUE, &rlim) != 0) {
+ one_way->pmq.q = mq_open(name, O_RDWR | O_NONBLOCK);
+ if (one_way->pmq.q == (mqd_t) - 1) {
res = -errno;
- qb_util_log(LOG_ERR, "setrlimit failed");
+ perror("mq_open");
return res;
}
-#endif /* QB_LINUX */
-
+ strcpy(one_way->pmq.name, name);
+ q_space_used += c->max_msg_size * q_len;
return 0;
}
-static mqd_t posix_mq_create(const char *mq_name, size_t max_msg_size,
- int32_t flags)
+static int32_t posix_mq_create(struct qb_ipcs_connection *c,
+ union qb_ipc_one_way *one_way,
+ const char *name, size_t q_len)
{
struct mq_attr attr;
- mqd_t res = 0;
- int32_t q_len = 10;
+ mqd_t q = 0;
+ int32_t res = 0;
mode_t m = 0600;
+ size_t max_msg_size = c->max_msg_size;
+ res = posix_mq_increase_limits(max_msg_size, q_len);
+ if (res != 0) {
+ return res;
+ }
+try_smaller:
+ if (q != 0) {
+ max_msg_size = max_msg_size / 2;
+ q_len--;
+ }
+// qb_util_log(LOG_DEBUG, "%s() max_msg_size:%zu q_len:%zu", __func__,
+// max_msg_size, q_len);
attr.mq_flags = O_NONBLOCK;
attr.mq_maxmsg = q_len;
attr.mq_msgsize = max_msg_size;
- if (mq_unlink(mq_name) == -1) {
- if (errno == EACCES) {
- qb_util_log(LOG_ERR, "Can't remove old mq \"%s\" : %s",
- mq_name, strerror(errno));
- return -1;
+ q = mq_open(name, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, m, &attr);
+ if (q == (mqd_t) - 1 && errno == ENOMEM) {
+ if (max_msg_size > 9000 && q_len > 3) {
+ goto try_smaller;
}
}
- res = mq_open(mq_name, flags, m, &attr);
- if (res == (mqd_t)-1) {
+ if (q == (mqd_t) - 1) {
+ res = -errno;
qb_util_log(LOG_ERR, "Can't create mq \"%s\": %s",
- mq_name, strerror(errno));
+ name, strerror(errno));
+ return res;
+ }
+ q_space_used += max_msg_size * q_len;
+ c->max_msg_size = max_msg_size;
+ one_way->pmq.q = q;
+ strcpy(one_way->pmq.name, name);
+
+ res = fchown(q, c->euid, c->egid);
+ if (res == -1) {
+ res = -errno;
+ qb_util_log(LOG_ERR, "fchown:%s %s", name, strerror(errno));
+ mq_close(q);
+ mq_unlink(name);
}
return res;
}
-
/*
* client functions
* --------------------------------------------------------
@@ -132,7 +155,7 @@ static mqd_t posix_mq_create(const char *mq_name, size_t
max_msg_size,
static int32_t qb_ipcc_pmq_send(struct qb_ipcc_connection *c,
const void *msg_ptr, size_t msg_len)
{
- int32_t res = mq_send(c->u.pmq.request.q, msg_ptr, msg_len, 1);
+ int32_t res = mq_send(c->request.pmq.q, msg_ptr, msg_len, 1);
if (res != 0) {
return -errno;
}
@@ -143,8 +166,9 @@ static ssize_t qb_ipcc_pmq_recv(struct qb_ipcc_connection *c,
void *msg_ptr, size_t msg_len)
{
uint32_t msg_prio;
- ssize_t res = mq_receive(c->u.pmq.response.q, (char *)msg_ptr, c->max_msg_size,
- &msg_prio);
+ ssize_t res =
+ mq_receive(c->response.pmq.q, (char *)msg_ptr, c->max_msg_size,
+ &msg_prio);
if (res < 0) {
return -errno;
}
@@ -161,71 +185,21 @@ static void qb_ipcc_pmq_disconnect(struct qb_ipcc_connection *c)
}
hdr.id = QB_IPC_MSG_DISCONNECT;
- hdr.session_id = c->session_id;
hdr.size = sizeof(hdr);
- mq_send(c->u.pmq.request.q, (const char *)&hdr, hdr.size, 30);
+ mq_send(c->request.pmq.q, (const char *)&hdr, hdr.size, 30);
- mq_close(c->u.pmq.event.q);
- mq_unlink(c->u.pmq.event.name);
+ mq_close(c->event.pmq.q);
+ mq_unlink(c->event.pmq.name);
- mq_close(c->u.pmq.response.q);
- mq_unlink(c->u.pmq.response.name);
+ mq_close(c->response.pmq.q);
+ mq_unlink(c->response.pmq.name);
- mq_close(c->u.pmq.request.q);
+ mq_close(c->request.pmq.q);
+ mq_unlink(c->request.pmq.name);
}
-static int32_t _ipcc_pmq_connect_to_service_(struct qb_ipcc_connection *c)
-{
- int32_t res;
- ssize_t size;
- uint32_t priority;
- struct mar_req_pmq_setup start;
- struct mar_res_setup *msg_res;
-
- start.hdr.id = QB_IPC_MSG_CONNECT;
- start.hdr.session_id = c->session_id;
- start.pid = getpid();
- start.hdr.size = sizeof(struct mar_req_pmq_setup);
- strcpy(start.response_mq, c->u.pmq.response.name);
- strcpy(start.event_mq, c->u.pmq.event.name);
-
- res =
- mq_send(c->u.pmq.request.q, (const char *)&start, start.hdr.size,
- 30);
- if (res == -1) {
- res = -errno;
- perror("mq_send");
- return res;
- }
-
- qb_util_log(LOG_DEBUG, "sent request to server %d\n", res);
- qb_util_log(LOG_DEBUG, "mq_receive'ing on %d\n", c->u.pmq.response.q);
-
-mq_recv_again:
- size = mq_receive(c->u.pmq.response.q, c->receive_buf,
- c->max_msg_size, &priority);
-
- if (size == -1 && errno == EAGAIN) {
- usleep(100000);
- goto mq_recv_again;
- }
- if (size == -1) {
- res = -errno;
- perror("_ipcc_pmq_connect_to_service_:mq_receive");
- goto cleanup;
- }
- qb_util_log(LOG_DEBUG, "received response from server %zd\n", size);
- msg_res = (struct mar_res_setup *)c->receive_buf;
- res = msg_res->hdr.error;
- if (res == 0) {
- c->max_msg_size = msg_res->max_msg_size;
- }
-
-cleanup:
- return res;
-}
-
-int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection * c)
+int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection *c,
+ struct qb_ipc_connection_response *response)
{
int32_t res = 0;
@@ -242,70 +216,35 @@ int32_t qb_ipcc_pmq_connect(struct qb_ipcc_connection * c)
return -EINVAL;
}
- /* Connect to the service's request message queue.
- */
- posix_mq_increase_limits(c->max_msg_size, 10);
- snprintf(c->u.pmq.request.name, NAME_MAX, "/%s", c->name);
- c->u.pmq.request.q = mq_open(c->u.pmq.request.name,
- O_WRONLY | O_NONBLOCK);
- if (c->u.pmq.request.q == (mqd_t)-1) {
- res = -errno;
+ res = posix_mq_open(c, &c->request, response->request,
+ QB_REQUEST_Q_LEN);
+ if (res != 0) {
perror("mq_open:REQUEST");
return res;
}
-
- /* Create the response message queue.
- */
- res = snprintf(c->u.pmq.response.name,
- NAME_MAX, "/%s-response-%d", c->name, getpid());
-
- posix_mq_increase_limits(c->max_msg_size, 10);
- c->u.pmq.response.q = posix_mq_create(c->u.pmq.response.name,
- c->max_msg_size,
- O_RDONLY | O_CREAT | O_EXCL |
- O_NONBLOCK);
-
- if (c->u.pmq.response.q == (mqd_t)-1) {
- res = -errno;
+ res = posix_mq_open(c, &c->response, response->response,
+ QB_RESPONSE_Q_LEN);
+ if (res != 0) {
perror("mq_open:RESPONSE");
goto cleanup_request;
}
-
- res =
- snprintf(c->u.pmq.event.name, NAME_MAX, "/%s-event-%d",
- c->name, getpid());
-
- posix_mq_increase_limits(c->max_msg_size, 10);
- c->u.pmq.event.q = posix_mq_create(c->u.pmq.event.name,
- c->max_msg_size,
- O_RDONLY | O_CREAT | O_EXCL |
- O_NONBLOCK);
-
- if (c->u.pmq.event.q == (mqd_t)-1) {
- res = -errno;
- perror("mq_open:event");
+ res = posix_mq_open(c, &c->event, response->event, QB_EVENT_Q_LEN);
+ if (res != 0) {
+ perror("mq_open:EVENT");
goto cleanup_request_response;
}
- res = _ipcc_pmq_connect_to_service_(c);
- if (res == 0) {
- return 0;
- }
-
- mq_close(c->u.pmq.event.q);
- mq_unlink(c->u.pmq.event.name);
+ return 0;
cleanup_request_response:
- mq_close(c->u.pmq.response.q);
- mq_unlink(c->u.pmq.response.name);
+ mq_close(c->response.pmq.q);
cleanup_request:
- mq_close(c->u.pmq.request.q);
+ mq_close(c->request.pmq.q);
return res;
}
-
/*
* service functions
* --------------------------------------------------------
@@ -327,9 +266,6 @@ static void qb_ipcs_pmq_destroy(struct qb_ipcs_service *s)
struct qb_ipcs_connection *c = NULL;
struct qb_list_head *iter;
struct qb_list_head *iter_next;
- char mq_name[NAME_MAX];
-
- snprintf(mq_name, NAME_MAX, "/%s", s->name);
qb_util_log(LOG_DEBUG, "%s\n", __func__);
@@ -344,79 +280,60 @@ static void qb_ipcs_pmq_destroy(struct qb_ipcs_service *s)
}
qb_ipcs_disconnect(c);
}
-
- if (mq_close(s->u.q) == -1)
- perror("mq_close");
- if (mq_unlink(mq_name) == -1)
- perror("mq_unlink");
}
static int32_t qb_ipcs_pmq_connect(struct qb_ipcs_service *s,
- struct qb_ipcs_connection *c, void *data,
- size_t size)
+ struct qb_ipcs_connection *c,
+ struct qb_ipc_connection_response *r)
{
- int32_t res;
- struct mar_req_pmq_setup *init = (struct mar_req_pmq_setup *)data;
- struct mar_res_setup accept_msg;
-
- c->pid = init->pid;
- c->service = s;
-
- /* setup the response message queue
- */
- posix_mq_increase_limits(c->service->max_msg_size, 10);
- strcpy(c->u.pmq.response.name, init->response_mq);
- c->u.pmq.response.q = mq_open(c->u.pmq.response.name,
- O_WRONLY | O_NONBLOCK);
- if (c->u.pmq.response.q == (mqd_t)-1) {
- res = -errno;
- return res;
- }
+ int32_t res = 0;
- /* setup the event message queue
- */
- posix_mq_increase_limits(c->service->max_msg_size, 10);
- strcpy(c->u.pmq.event.name, init->event_mq);
- c->u.pmq.event.q = mq_open(c->u.pmq.event.name,
- O_WRONLY | O_NONBLOCK);
+ snprintf(r->request, NAME_MAX, "/%s-request-%d", s->name, c->pid);
+ snprintf(r->response, NAME_MAX, "/%s-response-%d", s->name, c->pid);
+ snprintf(r->event, NAME_MAX, "/%s-event-%d", s->name, c->pid);
- if (c->u.pmq.event.q == (mqd_t)-1) {
- res = -errno;
- goto cleanup_response;
+ res = posix_mq_create(c, &c->request, r->request, QB_REQUEST_Q_LEN);
+ if (res < 0) {
+ goto cleanup;
}
- /* send the "connection accepted" mesage back.
- */
- accept_msg.hdr.id = QB_IPC_MSG_CONNECT;
- accept_msg.hdr.size = sizeof(struct mar_res_setup);
- accept_msg.hdr.error = 0;
- accept_msg.max_msg_size = s->max_msg_size;
+ res = posix_mq_create(c, &c->response, r->response, QB_RESPONSE_Q_LEN);
+ if (res < 0) {
+ goto cleanup_request;
+ }
- res =
- mq_send(c->u.pmq.response.q, (const char *)&accept_msg,
- sizeof(struct mar_res_setup), 30);
- if (res == -1) {
- res = -errno;
- perror("mq_send:RESPONSE");
- goto cleanup_response;
+ res = posix_mq_create(c, &c->event, r->event, QB_EVENT_Q_LEN);
+ if (res < 0) {
+ goto cleanup_request_response;
+ }
+
+ if (!s->needs_sock_for_poll) {
+ qb_poll_dispatch_add(s->poll_handle, c->request.pmq.q,
+ POLLIN | POLLPRI | POLLNVAL,
+ c, qb_ipcs_dispatch_service_request);
}
+ r->hdr.error = 0;
return 0;
-cleanup_response:
- accept_msg.hdr.error = res;
- mq_send(c->u.pmq.response.q, (const char *)&accept_msg,
- sizeof(struct mar_res_setup), 30);
- mq_close(c->u.pmq.response.q);
+cleanup_request_response:
+ mq_close(c->response.pmq.q);
+ mq_unlink(r->response);
+cleanup_request:
+ mq_close(c->request.pmq.q);
+ mq_unlink(r->request);
+
+cleanup:
+ r->hdr.error = res;
return res;
}
-static ssize_t qb_ipcs_pmq_request_recv(struct qb_ipcs_service *s, void *buf,
+static ssize_t qb_ipcs_pmq_request_recv(struct qb_ipcs_connection *c, void *buf,
size_t buf_size)
{
uint32_t msg_prio;
- ssize_t res = mq_receive(s->u.q, buf, buf_size, &msg_prio);
+ ssize_t res = mq_receive(c->request.pmq.q, buf, buf_size, &msg_prio);
if (res == -1) {
return -errno;
}
@@ -433,16 +350,19 @@ static int32_t qb_ipcs_pmq_fd_get(struct qb_ipcs_service *s)
static ssize_t qb_ipcs_pmq_response_send(struct qb_ipcs_connection *c,
void *data, size_t size)
{
- if (mq_send(c->u.pmq.response.q, (const char *)data, size, 1) == -1) {
- return -errno;
+ ssize_t res = mq_send(c->response.pmq.q, (const char *)data, size, 1);
+ if (res == -1) {
+ res = -errno;
+ perror("mq_send");
+ return res;
}
return size;
}
static ssize_t qb_ipcs_pmq_event_send(struct qb_ipcs_connection *c,
- void *data, size_t size)
+ void *data, size_t size)
{
- if (mq_send(c->u.pmq.event.q, (const char *)data, size, 1) == -1) {
+ if (mq_send(c->event.pmq.q, (const char *)data, size, 1) == -1) {
return -errno;
}
return size;
@@ -450,11 +370,6 @@ static ssize_t qb_ipcs_pmq_event_send(struct qb_ipcs_connection *c,
int32_t qb_ipcs_pmq_create(struct qb_ipcs_service * s)
{
- char mq_name[NAME_MAX];
- int32_t res;
-
- snprintf(mq_name, NAME_MAX, "/%s", s->name);
-
s->funcs.destroy = qb_ipcs_pmq_destroy;
s->funcs.request_recv = qb_ipcs_pmq_request_recv;
s->funcs.response_send = qb_ipcs_pmq_response_send;
@@ -465,20 +380,5 @@ int32_t qb_ipcs_pmq_create(struct qb_ipcs_service * s)
#else
s->needs_sock_for_poll = QB_TRUE;
#endif
-
- posix_mq_increase_limits(s->max_msg_size, 10);
- s->u.q = posix_mq_create(mq_name, s->max_msg_size,
- (O_RDONLY | O_CREAT | O_EXCL | O_NONBLOCK));
- if (s->u.q == (mqd_t)-1) {
- res = -errno;
- return res;
- }
- qb_util_log(LOG_DEBUG, "%s() %d", __func__, s->u.q);
-
- if (!s->needs_sock_for_poll) {
- qb_poll_dispatch_add(s->poll_handle, s->u.q,
- POLLIN | POLLPRI | POLLNVAL,
- s, qb_ipcs_dispatch_service_request);
- }
return 0;
}
diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c
index b9002ef..da44f67 100644
--- a/lib/ipc_shm.c
+++ b/lib/ipc_shm.c
@@ -26,7 +26,7 @@
#include <qb/qbrb.h>
static ssize_t qb_ipcs_shm_event_send(struct qb_ipcs_connection *c,
- void *data, size_t size);
+ void *data, size_t size);
/*
* utility functions
@@ -38,22 +38,22 @@ static ssize_t qb_ipcs_shm_event_send(struct qb_ipcs_connection *c,
*/
static void qb_ipcc_shm_disconnect(struct qb_ipcc_connection *c)
{
- qb_rb_close(c->u.shm.request.rb);
- qb_rb_close(c->u.shm.response.rb);
- qb_rb_close(c->u.shm.event.rb);
+ qb_rb_close(c->request.shm.rb);
+ qb_rb_close(c->response.shm.rb);
+ qb_rb_close(c->event.shm.rb);
}
static int32_t qb_ipcc_shm_send(struct qb_ipcc_connection *c,
const void *msg_ptr, size_t msg_len)
{
- return qb_rb_chunk_write(c->u.shm.request.rb, msg_ptr, msg_len);
+ return qb_rb_chunk_write(c->request.shm.rb, msg_ptr, msg_len);
}
static ssize_t qb_ipcc_shm_recv(struct qb_ipcc_connection *c,
void *msg_ptr, size_t msg_len)
{
ssize_t res =
- qb_rb_chunk_read(c->u.shm.response.rb, (void *)msg_ptr, msg_len, 0);
+ qb_rb_chunk_read(c->response.shm.rb, (void *)msg_ptr, msg_len, 0);
if (res == -ETIMEDOUT) {
return -EAGAIN;
}
@@ -61,69 +61,22 @@ static ssize_t qb_ipcc_shm_recv(struct qb_ipcc_connection *c,
}
static ssize_t qb_ipcc_shm_event_recv(struct qb_ipcc_connection *c,
- void **data_out, int32_t timeout)
+ void **data_out, int32_t timeout)
{
- ssize_t res =
- qb_rb_chunk_peek(c->u.shm.event.rb, data_out, timeout);
+ ssize_t res = qb_rb_chunk_peek(c->event.shm.rb, data_out, timeout);
if (res == -ETIMEDOUT) {
return -EAGAIN;
}
return res;
}
-static void qb_ipcc_shm_event_release(struct qb_ipcc_connection* c)
-{
- qb_rb_chunk_reclaim(c->u.shm.event.rb);
-}
-
-static int32_t _ipcc_shm_connect_to_service_(struct qb_ipcc_connection *c)
+static void qb_ipcc_shm_event_release(struct qb_ipcc_connection *c)
{
- int32_t res;
- ssize_t size;
- struct mar_req_shm_setup start;
- struct mar_res_setup *msg_res;
-
- start.hdr.id = QB_IPC_MSG_CONNECT;
- start.hdr.session_id = c->session_id;
- start.pid = getpid();
- start.hdr.size = sizeof(struct mar_req_shm_setup);
- strcpy(start.response, qb_rb_name_get(c->u.shm.response.rb));
- strcpy(start.event, qb_rb_name_get(c->u.shm.event.rb));
-
- c->needs_sock_for_poll = QB_TRUE;
-
- res = qb_rb_chunk_write(c->u.shm.request.rb,
- (const char *)&start,
- start.hdr.size);
- if (res < 0) {
- return res;
- }
-
- if (c->needs_sock_for_poll) {
- qb_ipc_us_send(c->sock, &start, 1);
- }
- qb_util_log(LOG_DEBUG, "sent request to server %d\n", res);
-
- size = qb_rb_chunk_read(c->u.shm.response.rb, c->receive_buf,
- c->max_msg_size, 100000);
-
- if (size < 0) {
- res = size;
- perror("_ipcc_shm_connect_to_service_:qb_rb_chunk_read");
- goto cleanup;
- }
- qb_util_log(LOG_DEBUG, "received response from server size:%zd\n", size);
- msg_res = (struct mar_res_setup *)c->receive_buf;
- res = msg_res->hdr.error;
- if (res == 0) {
- c->max_msg_size = msg_res->max_msg_size;
- }
-
-cleanup:
- return res;
+ qb_rb_chunk_reclaim(c->event.shm.rb);
}
-int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection * c)
+int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection *c,
+ struct qb_ipc_connection_response *response)
{
int32_t res = 0;
@@ -132,65 +85,45 @@ int32_t qb_ipcc_shm_connect(struct qb_ipcc_connection * c)
c->funcs.event_recv = qb_ipcc_shm_event_recv;
c->funcs.event_release = qb_ipcc_shm_event_release;
c->funcs.disconnect = qb_ipcc_shm_disconnect;
+ c->needs_sock_for_poll = QB_TRUE;
if (strlen(c->name) > (NAME_MAX - 20)) {
errno = EINVAL;
- return -1;
+ return -errno;
}
- /* Connect to the service's request message queue.
- */
- c->u.shm.request.rb = qb_rb_open(c->name, c->max_msg_size,
- QB_RB_FLAG_SHARED_PROCESS);
- if (c->u.shm.request.rb == NULL) {
+ c->request.shm.rb = qb_rb_open(response->request, c->max_msg_size,
+ QB_RB_FLAG_SHARED_PROCESS);
+ if (c->request.shm.rb == NULL) {
perror("qb_rb_open:REQUEST");
- return -1;
+ return -errno;
}
+ c->response.shm.rb = qb_rb_open(response->response,
+ c->max_msg_size,
+ QB_RB_FLAG_SHARED_PROCESS);
- /* Create the response message queue.
- */
- res = snprintf(c->u.shm.response.name,
- NAME_MAX, "%s-response-%d", c->name, getpid());
-
- c->u.shm.response.rb = qb_rb_open(c->u.shm.response.name,
- c->max_msg_size,
- QB_RB_FLAG_CREATE |
- QB_RB_FLAG_SHARED_PROCESS);
-
- if (c->u.shm.response.rb == NULL) {
+ if (c->response.shm.rb == NULL) {
perror("qb_rb_open:RESPONSE");
goto cleanup_request;
}
+ c->event.shm.rb = qb_rb_open(response->event,
+ c->max_msg_size,
+ QB_RB_FLAG_SHARED_PROCESS);
- res =
- snprintf(c->u.shm.event.name, NAME_MAX, "%s-event-%d",
- c->name, getpid());
-
- c->u.shm.event.rb = qb_rb_open(c->u.shm.event.name,
- c->max_msg_size,
- QB_RB_FLAG_CREATE |
- QB_RB_FLAG_SHARED_PROCESS);
-
- if (c->u.shm.event.rb == NULL) {
+ if (c->event.shm.rb == NULL) {
res = -errno;
perror("qb_rb_open:EVENT");
goto cleanup_request_response;
}
-
- res = _ipcc_shm_connect_to_service_(c);
- if (res == 0) {
- return 0;
- }
-
- qb_util_log(LOG_DEBUG, "connection failed %d\n", res);
-
- qb_rb_close(c->u.shm.event.rb);
+ return 0;
cleanup_request_response:
- qb_rb_close(c->u.shm.response.rb);
+ qb_rb_close(c->response.shm.rb);
cleanup_request:
- qb_rb_close(c->u.shm.request.rb);
+ qb_rb_close(c->request.shm.rb);
+
+ qb_util_log(LOG_DEBUG, "connection failed %d\n", res);
return res;
}
@@ -208,12 +141,12 @@ static void qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c)
msg.size = sizeof(msg);
msg.error = 0;
- if (c->u.shm.response.rb) {
+ if (c->response.shm.rb) {
qb_ipcs_shm_event_send(c, &msg, msg.size);
- qb_rb_close(c->u.shm.response.rb);
+ qb_rb_close(c->response.shm.rb);
}
- if (c->u.shm.event.rb) {
- qb_rb_close(c->u.shm.event.rb);
+ if (c->event.shm.rb) {
+ qb_rb_close(c->event.shm.rb);
}
}
@@ -236,80 +169,74 @@ static void qb_ipcs_shm_destroy(struct qb_ipcs_service *s)
}
qb_ipcs_disconnect(c);
}
-
- qb_rb_close(s->u.rb);
}
static int32_t qb_ipcs_shm_connect(struct qb_ipcs_service *s,
- struct qb_ipcs_connection *c, void *data,
- size_t size)
+ struct qb_ipcs_connection *c,
+ struct qb_ipc_connection_response *r)
{
int32_t res;
- struct mar_req_shm_setup *init = (struct mar_req_shm_setup *)data;
- struct mar_res_setup accept_msg;
- c->pid = init->pid;
- c->service = s;
qb_util_log(LOG_DEBUG, "connecting to client [%d]\n", c->pid);
- /* setup the response message queue
- */
- strcpy(c->u.shm.response.name, init->response);
- qb_util_log(LOG_DEBUG, "%s:%s", __func__, c->u.shm.response.name);
- c->u.shm.response.rb = qb_rb_open(c->u.shm.response.name,
- s->max_msg_size,
- QB_RB_FLAG_SHARED_PROCESS);
- if (c->u.shm.response.rb == NULL) {
+ snprintf(r->request, NAME_MAX, "%s-request-%d", s->name, c->pid);
+ snprintf(r->response, NAME_MAX, "%s-response-%d", s->name, c->pid);
+ snprintf(r->event, NAME_MAX, "%s-event-%d", s->name, c->pid);
+
+ qb_util_log(LOG_DEBUG, "rb_open:%s", r->request);
+ c->request.shm.rb = qb_rb_open(r->request,
+ c->max_msg_size,
+ QB_RB_FLAG_CREATE |
+ QB_RB_FLAG_SHARED_PROCESS);
+ if (c->request.shm.rb == NULL) {
res = -errno;
- perror("qb_rb_open:RESPONSE");
- return res;
+ perror("qb_rb_open:REQUEST");
+ goto cleanup;
}
+ res = qb_rb_chown(c->request.shm.rb, c->euid, c->egid);
- /* setup the event message queue
- */
- strcpy(c->u.shm.event.name, init->event);
- qb_util_log(LOG_DEBUG, "%s:%s", __func__, c->u.shm.event.name);
- c->u.shm.event.rb = qb_rb_open(c->u.shm.event.name,
- s->max_msg_size,
- QB_RB_FLAG_SHARED_PROCESS);
-
- if (c->u.shm.event.rb == NULL) {
+ c->response.shm.rb = qb_rb_open(r->response,
+ c->max_msg_size,
+ QB_RB_FLAG_CREATE |
+ QB_RB_FLAG_SHARED_PROCESS);
+ if (c->response.shm.rb == NULL) {
res = -errno;
- perror("mq_open:EVENT");
- goto cleanup_response;
+ perror("qb_rb_open:RESPONSE");
+ goto cleanup_request;
}
+ res = qb_rb_chown(c->response.shm.rb, c->euid, c->egid);
- /* send the "connection accepted" message back.
- */
- accept_msg.hdr.id = QB_IPC_MSG_CONNECT;
- accept_msg.hdr.size = sizeof(struct mar_res_setup);
- accept_msg.hdr.error = 0;
- accept_msg.max_msg_size = s->max_msg_size;
-
- qb_util_log(LOG_DEBUG, "%s:sending response", __func__);
- res = qb_rb_chunk_write(c->u.shm.response.rb, (const char *)&accept_msg,
- sizeof(struct mar_res_setup));
- if (res < 0) {
+ c->event.shm.rb = qb_rb_open(r->event,
+ c->max_msg_size,
+ QB_RB_FLAG_CREATE |
+ QB_RB_FLAG_SHARED_PROCESS);
+
+ if (c->event.shm.rb == NULL) {
res = -errno;
- perror("qb_rb_chunk_write:RESPONSE");
- goto cleanup_response;
+ perror("mq_open:EVENT");
+ goto cleanup_request_response;
}
+ res = qb_rb_chown(c->event.shm.rb, c->euid, c->egid);
+ r->hdr.error = 0;
return 0;
-cleanup_response:
- accept_msg.hdr.error = res;
- qb_rb_chunk_write(c->u.shm.response.rb, (const char *)&accept_msg,
- sizeof(struct mar_res_setup));
- qb_rb_close(c->u.shm.response.rb);
+cleanup_request_response:
+ qb_rb_close(c->request.shm.rb);
+
+cleanup_request:
+ qb_rb_close(c->response.shm.rb);
+
+cleanup:
+ r->hdr.error = res;
return res;
}
-static ssize_t qb_ipcs_shm_request_recv(struct qb_ipcs_service *s, void *buf,
+static ssize_t qb_ipcs_shm_request_recv(struct qb_ipcs_connection *c, void *buf,
size_t buf_size)
{
- int32_t res = qb_rb_chunk_read(s->u.rb, buf, buf_size, 0);
+ int32_t res = qb_rb_chunk_read(c->request.shm.rb, buf, buf_size, 0);
if (res == -ETIMEDOUT) {
return -EAGAIN;
}
@@ -319,21 +246,17 @@ static ssize_t qb_ipcs_shm_request_recv(struct qb_ipcs_service *s,
void *buf,
static ssize_t qb_ipcs_shm_response_send(struct qb_ipcs_connection *c,
void *data, size_t size)
{
- return qb_rb_chunk_write(c->u.shm.response.rb, (const char *)data,
- size);
+ return qb_rb_chunk_write(c->response.shm.rb, (const char *)data, size);
}
static ssize_t qb_ipcs_shm_event_send(struct qb_ipcs_connection *c,
- void *data, size_t size)
+ void *data, size_t size)
{
- return qb_rb_chunk_write(c->u.shm.event.rb, (const char *)data,
- size);
+ return qb_rb_chunk_write(c->event.shm.rb, (const char *)data, size);
}
int32_t qb_ipcs_shm_create(struct qb_ipcs_service *s)
{
- int32_t res = 0;
-
s->funcs.destroy = qb_ipcs_shm_destroy;
s->funcs.request_recv = qb_ipcs_shm_request_recv;
s->funcs.response_send = qb_ipcs_shm_response_send;
@@ -341,15 +264,5 @@ int32_t qb_ipcs_shm_create(struct qb_ipcs_service *s)
s->funcs.disconnect = qb_ipcs_shm_disconnect;
s->funcs.event_send = qb_ipcs_shm_event_send;
s->needs_sock_for_poll = QB_TRUE;
-
- s->u.rb = qb_rb_open(s->name, s->max_msg_size,
- QB_RB_FLAG_CREATE | QB_RB_FLAG_SHARED_PROCESS);
- if (s->u.rb == NULL) {
- res = -errno;
- perror("qb_rb_open:REQUEST");
- return res;
- }
-
- qb_util_log(LOG_DEBUG, "%s()", __func__);
- return res;
+ return 0;
}
diff --git a/lib/ipc_sysv_mq.c b/lib/ipc_sysv_mq.c
index 6b2b716..652cd7c 100644
--- a/lib/ipc_sysv_mq.c
+++ b/lib/ipc_sysv_mq.c
@@ -26,94 +26,83 @@
#include <qb/qbpoll.h>
#include "ipc_int.h"
#include "util_int.h"
+
#ifndef MSGMAX
#define MSGMAX 8192
#endif
+
#define MY_DATA_SIZE 8000
struct my_msgbuf {
int32_t id __attribute__ ((aligned(8)));
char data[MY_DATA_SIZE] __attribute__ ((aligned(8)));
} __attribute__ ((aligned(8)));
-
static ssize_t qb_ipcs_smq_event_send(struct qb_ipcs_connection *c,
- void *data, size_t size);
+ void *data, size_t size);
/*
* utility functions
* --------------------------------------------------------
*/
-static int32_t sysv_mq_create(struct qb_ipcs_service *s)
+static int32_t sysv_mq_unnamed_create(struct qb_ipcs_connection *c,
+ union qb_ipc_one_way *queue)
{
struct msqid_ds info;
int32_t res = 0;
- s->u.smq.q = msgget(s->u.smq.key, IPC_CREAT | O_EXCL | IPC_NOWAIT |
- S_IWUSR | S_IRUSR | S_IWGRP);
- if (s->u.smq.q == -1) {
- res = -errno;
- perror("msgget:REQUEST");
- return res;
+retry_creating_the_q:
+ queue->smq.key = random();
+ queue->smq.q =
+ msgget(queue->smq.key,
+ IPC_CREAT | IPC_EXCL | IPC_NOWAIT | S_IWUSR | S_IRUSR);
+ if (queue->smq.q == -1 && errno == EEXIST) {
+ goto retry_creating_the_q;
+ } else if (queue->smq.q == -1) {
+ return -errno;
}
- res = msgctl(s->u.smq.q, IPC_STAT, &info);
+ /*
+ * change the queue size and change the ownership to that of
+ * the client so they can access it.
+ */
+ res = msgctl(queue->smq.q, IPC_STAT, &info);
if (res != 0) {
res = -errno;
- perror("msgctl:STAT");
- qb_util_log(LOG_ERR, "error getting mq info");
+ qb_util_log(LOG_ERR, "error getting sysv-mq info : %s",
+ strerror(errno));
return res;
}
if (info.msg_perm.uid != 0) {
- qb_util_log(LOG_WARNING, "not enough privileges to increase msg_qbytes");
+ qb_util_log(LOG_WARNING,
+ "not enough privileges to increase msg_qbytes");
return res;
}
- info.msg_qbytes = 2 * s->max_msg_size;
- res = msgctl(s->u.smq.q, IPC_SET, &info);
+ info.msg_qbytes = 2 * c->max_msg_size;
+ info.msg_perm.uid = c->euid;
+ info.msg_perm.gid = c->egid;
+
+ res = msgctl(queue->smq.q, IPC_SET, &info);
if (res != 0) {
res = -errno;
- perror("msgctl:SET");
- qb_util_log(LOG_ERR, "error changing msg_qbytes to %zu",
- 10 * s->max_msg_size);
+ qb_util_log(LOG_ERR,
+ "error modifing the SYSV message queue : %s",
+ strerror(errno));
+ return res;
}
- return res;
-}
-static int32_t sysv_mq_unnamed_create(struct qb_ipcc_smq_one_way *queue)
-{
-retry_creating_the_q:
- queue->key = random();
- queue->q = msgget(queue->key, IPC_CREAT | IPC_EXCL | IPC_NOWAIT |
- S_IWUSR | S_IRUSR | S_IRGRP);
- if (queue->q == -1 && errno == EEXIST) {
- goto retry_creating_the_q;
- } else if (queue->q == -1) {
- return -errno;
- }
return 0;
}
-static key_t sysv_key_from_name(const char *name)
-{
- char key_location[PATH_MAX];
-
- snprintf(key_location, PATH_MAX, "/tmp/qb_%s.smq", name);
-
- return ftok(key_location, 0);
-}
-
-#define PACK_MESSAGES 1
-
-static int32_t sysv_send(mqd_t q,
- const void *msg_ptr, size_t msg_len)
+static int32_t sysv_send(mqd_t q, const void *msg_ptr, size_t msg_len)
{
int32_t res;
int32_t sent = 0;
#ifdef PACK_MESSAGES
- char *progress = (char*)msg_ptr;
+ char *progress = (char *)msg_ptr;
struct my_msgbuf buf;
- size_t to_send_now; /* to send in this message */
- size_t to_send_next; /* to send in next message */
+ size_t to_send_now; /* to send in this message */
+ size_t to_send_next; /* to send in next message */
do {
to_send_now = QB_MIN(msg_len - sent, MY_DATA_SIZE);
@@ -133,7 +122,7 @@ static int32_t sysv_send(mqd_t q,
} while (sent < msg_len);
- return_status:
+return_status:
#else
res = msgsnd(q, msg_ptr, msg_len, IPC_NOWAIT);
sent = msg_len;
@@ -149,13 +138,12 @@ static ssize_t sysv_recv(mqd_t q, void *msg_ptr, size_t msg_len)
ssize_t res;
ssize_t received = 0;
#ifdef PACK_MESSAGES
- char *progress = (char*)msg_ptr;
+ char *progress = (char *)msg_ptr;
struct my_msgbuf buf;
do {
- try_again:
- res = msgrcv(q, &buf,
- MY_DATA_SIZE, 0, IPC_NOWAIT);
+try_again:
+ res = msgrcv(q, &buf, MY_DATA_SIZE, 0, IPC_NOWAIT);
if (res == -1 && errno == ENOMSG) {
goto try_again;
@@ -168,10 +156,9 @@ static ssize_t sysv_recv(mqd_t q, void *msg_ptr, size_t msg_len)
received += res;
progress += res;
} while (buf.id > 1);
- return_status:
+return_status:
#else
- res = msgrcv(q, msg_ptr, msg_len,
- 0, IPC_NOWAIT);
+ res = msgrcv(q, msg_ptr, msg_len, 0, IPC_NOWAIT);
received = res;
#endif
if (res == -1 && errno == ENOMSG) {
@@ -186,7 +173,6 @@ static ssize_t sysv_recv(mqd_t q, void *msg_ptr, size_t msg_len)
return received;
}
-
/*
* client functions
* --------------------------------------------------------
@@ -194,13 +180,13 @@ static ssize_t sysv_recv(mqd_t q, void *msg_ptr, size_t msg_len)
static int32_t qb_ipcc_smq_send(struct qb_ipcc_connection *c,
const void *msg_ptr, size_t msg_len)
{
- return sysv_send(c->u.smq.request.q, msg_ptr, msg_len);
+ return sysv_send(c->request.smq.q, msg_ptr, msg_len);
}
static ssize_t qb_ipcc_smq_recv(struct qb_ipcc_connection *c,
void *msg_ptr, size_t msg_len)
{
- return sysv_recv(c->u.smq.response.q, msg_ptr, msg_len);
+ return sysv_recv(c->response.smq.q, msg_ptr, msg_len);
}
static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c)
@@ -213,70 +199,17 @@ static void qb_ipcc_smq_disconnect(struct qb_ipcc_connection *c)
}
hdr.id = QB_IPC_MSG_DISCONNECT;
- hdr.session_id = c->session_id;
hdr.size = sizeof(hdr);
- sysv_send(c->u.smq.request.q, (const char *)&hdr, hdr.size);
-
- msgctl(c->u.smq.event.q, IPC_RMID, NULL);
- msgctl(c->u.smq.response.q, IPC_RMID, NULL);
-}
-
-static int32_t _smq_connect_to_service_(struct qb_ipcc_connection *c)
-{
- int32_t res;
- ssize_t size;
- int32_t waited;
- struct mar_req_smq_setup start;
- struct mar_res_setup *msg_res;
-
- start.hdr.id = QB_IPC_MSG_CONNECT;
- start.hdr.session_id = c->session_id;
- start.pid = getpid();
- start.hdr.size = sizeof(struct mar_req_smq_setup);
- start.response_key = c->u.smq.response.key;
- start.event_key = c->u.smq.event.key;
-
- if (c->needs_sock_for_poll) {
- qb_ipc_us_send(c->sock, &start, 1);
- }
- res = sysv_send(c->u.smq.request.q, (const char *)&start,
- start.hdr.size);
-
- if (res == -1) {
- perror("sysv_send");
- return res;
- }
- qb_util_log(LOG_DEBUG, "sent request to server %d\n", res);
-
- waited = 0;
-mq_recv_again:
- size = sysv_recv(c->u.smq.response.q, c->receive_buf,
- c->max_msg_size);
-
- if ((size == -EAGAIN || size == -ENOMSG) && waited < 10) {
- usleep(100000);
- waited++;
- goto mq_recv_again;
- }
- if (size < 0) {
- perror("sysv_recv");
- goto cleanup;
- }
- qb_util_log(LOG_DEBUG, "received response from server %zd\n", size);
- msg_res = (struct mar_res_setup *)c->receive_buf;
- res = msg_res->hdr.error;
- if (res == 0) {
- c->max_msg_size = msg_res->max_msg_size;
- }
+ sysv_send(c->request.smq.q, (const char *)&hdr, hdr.size);
-cleanup:
-
- return res;
+ msgctl(c->event.smq.q, IPC_RMID, NULL);
+ msgctl(c->response.smq.q, IPC_RMID, NULL);
}
-int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection * c)
+int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection *c,
+ struct qb_ipc_connection_response *response)
{
- int32_t res;
+ int32_t res = 0;
c->funcs.send = qb_ipcc_smq_send;
c->funcs.recv = qb_ipcc_smq_recv;
@@ -288,49 +221,31 @@ int32_t qb_ipcc_smq_connect(struct qb_ipcc_connection * c)
return -EINVAL;
}
- /* Connect to the service's request message queue.
- */
- c->u.smq.request.key = sysv_key_from_name(c->name);
- if (c->u.smq.request.key == -1) {
- res = -errno;
- perror("ftok:REQUEST");
- return res;
- }
- c->u.smq.request.q = msgget(c->u.smq.request.key, IPC_NOWAIT);
- if (c->u.smq.request.q == -1) {
+ memcpy(&c->request.smq.key, response->request, sizeof(uint32_t));
+ c->request.smq.q = msgget(c->request.smq.key, IPC_NOWAIT);
+ if (c->request.smq.q == -1) {
res = -errno;
perror("msgget:REQUEST");
- return res;
+ goto cleanup;
}
- /* Create the response message queue.
- */
- res = sysv_mq_unnamed_create(&c->u.smq.response);
- if (res < 0) {
+ memcpy(&c->response.smq.key, response->response, sizeof(uint32_t));
+ c->response.smq.q = msgget(c->response.smq.key, IPC_NOWAIT);
+ if (c->response.smq.q == -1) {
+ res = -errno;
perror("msgget:RESPONSE");
- goto cleanup_request;
- }
-
- /* Create the event message queue.
- */
- res = sysv_mq_unnamed_create(&c->u.smq.event);
- if (res < 0) {
- perror("msgget:event");
- goto cleanup_request_response;
+ goto cleanup;
}
- res = _smq_connect_to_service_(c);
- if (res == 0) {
- return 0;
+ memcpy(&c->event.smq.key, response->event, sizeof(uint32_t));
+ c->event.smq.q = msgget(c->event.smq.key, IPC_NOWAIT);
+ if (c->event.smq.q == -1) {
+ res = -errno;
+ perror("msgget:EVENT");
+ goto cleanup;
}
- msgctl(c->u.smq.event.q, IPC_RMID, NULL);
-
-cleanup_request_response:
- msgctl(c->u.smq.response.q, IPC_RMID, NULL);
-
-cleanup_request:
-
+cleanup:
return res;
}
@@ -372,65 +287,45 @@ static void qb_ipcs_smq_destroy(struct qb_ipcs_service *s)
}
qb_ipcs_disconnect(c);
}
-
- if (msgctl(s->u.smq.q, IPC_RMID, NULL) == -1)
- perror("msgctl:RMID");
}
static int32_t qb_ipcs_smq_connect(struct qb_ipcs_service *s,
- struct qb_ipcs_connection *c, void *data,
- size_t size)
+ struct qb_ipcs_connection *c,
+ struct qb_ipc_connection_response *r)
{
int32_t res = 0;
- struct mar_req_smq_setup *init = (struct mar_req_smq_setup *)data;
- struct mar_res_setup accept_msg;
-
- c->pid = init->pid;
- c->service = s;
-
- accept_msg.hdr.id = QB_IPC_MSG_CONNECT;
- accept_msg.hdr.size = sizeof(struct mar_res_setup);
- /* setup the response message queue
- */
- c->u.smq.response.key = init->response_key;
- c->u.smq.response.q = msgget(c->u.smq.response.key, IPC_NOWAIT);
- if (c->u.smq.response.q == -1) {
+ res = sysv_mq_unnamed_create(c, &c->request);
+ if (res < 0) {
res = -errno;
- perror("msgget:RESPONSE");
goto cleanup;
}
+ memcpy(r->request, &c->request.smq.key, sizeof(int32_t));
- /* setup the event message queue
- */
- c->u.smq.event.key = init->event_key;
- c->u.smq.event.q = msgget(c->u.smq.event.key, IPC_NOWAIT);
- if (c->u.smq.event.q == -1) {
+ res = sysv_mq_unnamed_create(c, &c->response);
+ if (res < 0) {
res = -errno;
- perror("msgget:event");
- goto cleanup_response;
+ goto cleanup_request;
}
+ memcpy(r->response, &c->response.smq.key, sizeof(int32_t));
- /* send the "connection accepted" message back.
- */
- accept_msg.hdr.error = 0;
- accept_msg.max_msg_size = s->max_msg_size;
-
- res = sysv_send(c->u.smq.response.q, (const char *)&accept_msg,
- sizeof(struct mar_res_setup));
+ res = sysv_mq_unnamed_create(c, &c->event);
if (res < 0) {
- perror("sysv_send:RESPONSE");
- goto cleanup_response;
+ res = -errno;
+ goto cleanup_request_response;
}
+ memcpy(r->event, &c->event.smq.key, sizeof(int32_t));
+ r->hdr.error = 0;
return 0;
-cleanup_response:
- accept_msg.hdr.error = res;
- sysv_send(c->u.smq.response.q, (const char *)&accept_msg,
- sizeof(struct mar_res_setup));
+cleanup_request:
+ msgctl(c->request.smq.q, IPC_RMID, NULL);
+cleanup_request_response:
+ msgctl(c->response.smq.q, IPC_RMID, NULL);
cleanup:
+ r->hdr.error = res;
return res;
}
@@ -448,40 +343,31 @@ static int32_t qb_ipcs_smq_is_msg_ready(struct qb_ipcs_service *s)
return -errno;
}
#endif
-static ssize_t qb_ipcs_smq_request_recv(struct qb_ipcs_service *s, void *buf,
+static ssize_t qb_ipcs_smq_request_recv(struct qb_ipcs_connection *c, void *buf,
size_t buf_size)
{
- return sysv_recv(s->u.q, buf, buf_size);
+ return sysv_recv(c->request.smq.q, buf, buf_size);
}
static ssize_t qb_ipcs_smq_response_send(struct qb_ipcs_connection *c,
void *data, size_t size)
{
- return sysv_send(c->u.smq.response.q, (const char *)data, size);
+ return sysv_send(c->response.smq.q, (const char *)data, size);
}
static ssize_t qb_ipcs_smq_event_send(struct qb_ipcs_connection *c,
- void *data, size_t size)
+ void *data, size_t size)
{
- return sysv_send(c->u.smq.event.q, (const char *)data, size);
+ return sysv_send(c->event.smq.q, (const char *)data, size);
}
-int32_t qb_ipcs_smq_create(struct qb_ipcs_service * s)
+int32_t qb_ipcs_smq_create(struct qb_ipcs_service *s)
{
- int32_t fd;
- char key_location[PATH_MAX];
-
- snprintf(key_location, PATH_MAX, "/tmp/qb_%s.smq", s->name);
-
- fd = creat(key_location, 0600);
- s->u.smq.key = ftok(key_location, 0);
-
s->funcs.destroy = qb_ipcs_smq_destroy;
s->funcs.connect = qb_ipcs_smq_connect;
s->funcs.disconnect = qb_ipcs_smq_disconnect;
s->funcs.response_send = qb_ipcs_smq_response_send;
s->funcs.request_recv = qb_ipcs_smq_request_recv;
s->needs_sock_for_poll = QB_TRUE;
-
- return sysv_mq_create(s);
+ return 0;
}
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 1e432c5..61a6d78 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -99,14 +99,13 @@ retry_send:
}
static ssize_t qb_ipc_us_recv_msghdr(int32_t s,
- struct msghdr *hdr, const char *msg,
- size_t len)
+ struct msghdr *hdr, char *msg, size_t len)
{
int32_t result;
int32_t processed = 0;
retry_recv:
- hdr->msg_iov->iov_base = (void *)&msg[processed];
+ hdr->msg_iov->iov_base = &msg[processed];
hdr->msg_iov->iov_len = len - processed;
result = recvmsg(s, hdr, MSG_NOSIGNAL | MSG_WAITALL);
@@ -160,7 +159,7 @@ static int32_t qb_ipcs_uc_recv_and_auth(struct qb_ipcs_connection *c)
int32_t res = 0;
struct msghdr msg_recv;
struct iovec iov_recv;
- char setup_msg[sizeof(struct mar_req_initial_setup)];
+ struct qb_ipc_connection_request setup_msg;
#ifdef QB_LINUX
struct cmsghdr *cmsg;
@@ -184,21 +183,22 @@ static int32_t qb_ipcs_uc_recv_and_auth(struct qb_ipcs_connection
*c)
#endif /* QB_SOLARIS */
iov_recv.iov_base = &setup_msg;
- iov_recv.iov_len = sizeof(struct mar_req_initial_setup);
+ iov_recv.iov_len = sizeof(struct qb_ipc_connection_request);
#ifdef QB_LINUX
setsockopt(c->sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
- res = qb_ipc_us_recv_msghdr(c->sock, &msg_recv, setup_msg,
- sizeof(struct mar_req_initial_setup));
+ res = qb_ipc_us_recv_msghdr(c->sock, &msg_recv, (char *)&setup_msg,
+ sizeof(struct qb_ipc_connection_request));
if (res < 0) {
goto cleanup_and_return;
}
- if (res != sizeof(struct mar_req_initial_setup)) {
+ if (res != sizeof(struct qb_ipc_connection_request)) {
res = -EIO;
goto cleanup_and_return;
}
+ c->max_msg_size = setup_msg.max_msg_size;
res = -EBADMSG;
/*
@@ -481,9 +481,7 @@ int32_t qb_ipcs_us_publish(struct qb_ipcs_service * s)
#endif
if (listen(s->server_sock, SERVER_BACKLOG) == -1) {
strerror_r(errno, error_str, 100);
- qb_util_log(LOG_ERR,
- "listen failed: %s.\n",
- error_str);
+ qb_util_log(LOG_ERR, "listen failed: %s.\n", error_str);
}
qb_poll_dispatch_add(s->poll_handle, s->server_sock,
@@ -498,8 +496,7 @@ error_close:
int32_t qb_ipcs_us_withdraw(struct qb_ipcs_service * s)
{
- qb_util_log(LOG_INFO,
- "withdrawing server sockets\n");
+ qb_util_log(LOG_INFO, "withdrawing server sockets\n");
shutdown(s->server_sock, SHUT_RDWR);
close(s->server_sock);
return 0;
@@ -512,7 +509,7 @@ static int32_t qb_ipcs_us_connection_acceptor(qb_handle_t handle,
int32_t new_fd;
struct qb_ipcs_connection *c;
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
- struct mar_res_initial_setup init_res;
+ struct qb_ipc_connection_response response;
int32_t res;
socklen_t addrlen = sizeof(struct sockaddr_un);
char error_str[100];
@@ -529,14 +526,14 @@ retry_accept:
qb_util_log(LOG_ERR,
"Could not accept Library connection:(fd: %d) [%d] %s\n",
fd, errno, error_str);
- return (-1);
+ return -1;
}
if (new_fd == -1) {
strerror_r(errno, error_str, 100);
qb_util_log(LOG_ERR,
"Could not accept Library connection: [%d] %s\n",
errno, error_str);
- return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
+ return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
res = fcntl(new_fd, F_SETFL, O_NONBLOCK);
@@ -546,12 +543,9 @@ retry_accept:
"Could not set non-blocking operation on library connection: %s\n",
error_str);
close(new_fd);
- return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
+ return 0; /* This is an error, but -1 would indicate disconnect from poll loop */
}
- /*
- * Valid accept
- */
c = qb_ipcs_connection_alloc(s);
c->sock = new_fd;
@@ -559,39 +553,45 @@ retry_accept:
if (res == 0) {
qb_util_log(LOG_INFO, "IPC credentials authenticated");
- qb_list_add(&c->list, &s->connections);
+ res = s->funcs.connect(s, c, &response);
+ if (res != 0) {
+ goto send_response;
+ }
- init_res.hdr.id = QB_IPC_MSG_AUTHENTICATE;
- init_res.hdr.size = sizeof(init_res);
- init_res.hdr.error = 0;
- init_res.connection_type = s->type;
- init_res.session_id = c->handle;
- init_res.max_msg_size = s->max_msg_size;
+ qb_list_add(&c->list, &s->connections);
+ c->receive_buf = malloc(c->max_msg_size);
- qb_ipc_us_send(c->sock, &init_res, init_res.hdr.size);
if (s->needs_sock_for_poll) {
qb_poll_dispatch_add(s->poll_handle, c->sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
}
+ }
- } else {
- if (res == -EACCES) {
- qb_util_log(LOG_ERR, "Invalid IPC credentials.");
- } else {
- strerror_r(-res, error_str, 100);
- qb_util_log(LOG_ERR, "Error in conection setup: %s.",
- error_str);
- }
- init_res.hdr.id = QB_IPC_MSG_AUTHENTICATE;
- init_res.hdr.size = sizeof(init_res);
- init_res.hdr.error = res;
- init_res.session_id = 0;
+send_response:
+ response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
+ response.hdr.size = sizeof(response);
+ response.hdr.error = res;
+ response.connection_type = s->type;
+ response.max_msg_size = c->max_msg_size;
+
+ qb_ipc_us_send(c->sock, &response, response.hdr.size);
- qb_ipc_us_send(c->sock, &init_res, init_res.hdr.size);
+ if (res == 0) {
+ if (s->serv_fns.connection_created) {
+ s->serv_fns.connection_created(c->handle);
+ }
+ } else if (res == -EACCES) {
+ qb_util_log(LOG_ERR, "Invalid IPC credentials.");
+ } else {
+ strerror_r(-response.hdr.error, error_str, 100);
+ qb_util_log(LOG_ERR, "Error in conection setup: %s.",
+ error_str);
+ }
+ if (res != 0) {
qb_ipcs_disconnect(c);
}
- return (0);
+ return 0;
}
diff --git a/lib/ipcc.c b/lib/ipcc.c
index dd01ecb..33619d5 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -25,13 +25,13 @@
#include "util_int.h"
#include <qb/qbipcc.h>
-qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
+qb_ipcc_connection_t *qb_ipcc_connect(const char *name, size_t max_msg_size)
{
int32_t res;
int32_t usock;
qb_ipcc_connection_t *c = NULL;
- struct mar_req_initial_setup init_req;
- struct mar_res_initial_setup init_res;
+ struct qb_ipc_connection_request request;
+ struct qb_ipc_connection_response response;
res = qb_ipcc_us_connect(name, &usock);
if (res != 0) {
@@ -40,9 +40,10 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
return NULL;
}
- init_req.hdr.id = QB_IPC_MSG_AUTHENTICATE;
- init_req.hdr.size = sizeof(init_req);
- res = qb_ipc_us_send(usock, &init_req, init_req.hdr.size);
+ request.hdr.id = QB_IPC_MSG_AUTHENTICATE;
+ request.hdr.size = sizeof(request);
+ request.max_msg_size = max_msg_size;
+ res = qb_ipc_us_send(usock, &request, request.hdr.size);
if (res < 0) {
perror("qb_ipc_us_send");
qb_ipcc_us_disconnect(usock);
@@ -50,7 +51,7 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
return NULL;
}
- res = qb_ipc_us_recv(usock, &init_res, sizeof(init_res));
+ res = qb_ipc_us_recv(usock, &response, sizeof(response));
if (res < 0) {
perror("qb_ipc_us_recv");
qb_ipcc_us_disconnect(usock);
@@ -58,8 +59,8 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
return NULL;
}
- if (init_res.hdr.error != 0) {
- errno = -init_res.hdr.error;
+ if (response.hdr.error != 0) {
+ errno = -response.hdr.error;
perror("recv:message");
return NULL;
}
@@ -69,21 +70,24 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
return NULL;
}
strcpy(c->name, name);
- c->type = init_res.connection_type;
+ c->type = response.connection_type;
c->sock = usock;
- c->session_id = init_res.session_id;
- c->max_msg_size = init_res.max_msg_size;
+
+ qb_util_log(LOG_DEBUG, "%s() max_msg_size:%d actual:%d", __func__,
+ max_msg_size, response.max_msg_size);
+
+ c->max_msg_size = response.max_msg_size;
c->receive_buf = malloc(c->max_msg_size);
switch (c->type) {
case QB_IPC_SHM:
- res = qb_ipcc_shm_connect(c);
+ res = qb_ipcc_shm_connect(c, &response);
break;
case QB_IPC_POSIX_MQ:
- res = qb_ipcc_pmq_connect(c);
+ res = qb_ipcc_pmq_connect(c, &response);
break;
case QB_IPC_SYSV_MQ:
- res = qb_ipcc_smq_connect(c);
+ res = qb_ipcc_smq_connect(c, &response);
break;
case QB_IPC_SOCKET:
c->needs_sock_for_poll = QB_FALSE;
@@ -93,6 +97,7 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
break;
}
if (res != 0) {
+ qb_ipcc_us_disconnect(usock);
free(c);
c = NULL;
errno = -res;
@@ -103,15 +108,12 @@ qb_ipcc_connection_t *qb_ipcc_connect(const char *name)
int32_t qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr,
size_t msg_len)
{
- struct qb_ipc_request_header *hdr = NULL;
ssize_t res;
if (msg_len > c->max_msg_size) {
return -EINVAL;
}
- hdr = (struct qb_ipc_request_header *)msg_ptr;
- hdr->session_id = c->session_id;
res = c->funcs.send(c, msg_ptr, msg_len);
if (res > 0 && c->needs_sock_for_poll) {
qb_ipc_us_send(c->sock, msg_ptr, 1);
@@ -125,17 +127,18 @@ ssize_t qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr,
return c->funcs.recv(c, msg_ptr, msg_len);
}
-int32_t qb_ipcc_fd_get(struct qb_ipcc_connection* c, int32_t * fd)
+int32_t qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd)
{
if (c->needs_sock_for_poll) {
*fd = c->sock;
} else {
- *fd = 0; /*TODO??*/
+ *fd = 0; /*TODO?? */
}
return 0;
}
-int32_t qb_ipcc_event_recv(struct qb_ipcc_connection* c, void **data_out, int32_t
timeout)
+int32_t qb_ipcc_event_recv(struct qb_ipcc_connection * c, void **data_out,
+ int32_t timeout)
{
char one_byte = 1;
@@ -145,7 +148,7 @@ int32_t qb_ipcc_event_recv(struct qb_ipcc_connection* c, void
**data_out, int32_
return c->funcs.event_recv(c, data_out, timeout);
}
-void qb_ipcc_event_release(struct qb_ipcc_connection* c)
+void qb_ipcc_event_release(struct qb_ipcc_connection *c)
{
c->funcs.event_release(c);
}
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 239e0b8..2f9c1ed 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -30,8 +30,7 @@ static void qb_ipcs_disconnect_internal(void *data);
QB_HDB_DECLARE(qb_ipc_services, qb_ipcs_destroy_internal);
QB_HDB_DECLARE(qb_ipc_connections, qb_ipcs_disconnect_internal);
-qb_ipcs_service_pt qb_ipcs_create(const char *name, enum qb_ipc_type type,
- size_t max_msg_size)
+qb_ipcs_service_pt qb_ipcs_create(const char *name, enum qb_ipc_type type)
{
struct qb_ipcs_service *s;
qb_ipcs_service_pt h;
@@ -42,25 +41,11 @@ qb_ipcs_service_pt qb_ipcs_create(const char *name, enum qb_ipc_type
type,
s->pid = getpid();
s->type = type;
- s->max_msg_size = max_msg_size;
- s->receive_buf = malloc(s->max_msg_size);
s->needs_sock_for_poll = QB_FALSE;
qb_list_init(&s->connections);
- snprintf(s->name, 255, "%s", name);
+ snprintf(s->name, NAME_MAX, "%s", name);
- switch (s->type) {
- case QB_IPC_SOCKET:
- case QB_IPC_POSIX_MQ:
- case QB_IPC_SYSV_MQ:
- case QB_IPC_SHM:
- break;
- default:
- qb_hdb_handle_destroy(&qb_ipc_services, h);
- errno = EINVAL;
- h = 0;
- break;
- }
qb_hdb_handle_put(&qb_ipc_services, h);
return h;
@@ -145,10 +130,10 @@ static void qb_ipcs_destroy_internal(void *data)
{
struct qb_ipcs_service *s = (struct qb_ipcs_service *)data;
s->funcs.destroy(s);
- free(s->receive_buf);
}
-ssize_t qb_ipcs_response_send(qb_ipcs_connection_handle_t c, void *data, size_t size)
+ssize_t qb_ipcs_response_send(qb_ipcs_connection_handle_t c, void *data,
+ size_t size)
{
ssize_t res;
struct qb_ipcs_connection *con;
@@ -163,7 +148,8 @@ ssize_t qb_ipcs_response_send(qb_ipcs_connection_handle_t c, void
*data, size_t
return res;
}
-ssize_t qb_ipcs_event_send(qb_ipcs_connection_handle_t c, void *data, size_t size)
+ssize_t qb_ipcs_event_send(qb_ipcs_connection_handle_t c, void *data,
+ size_t size)
{
ssize_t res;
struct qb_ipcs_connection *con;
@@ -199,6 +185,7 @@ struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct
qb_ipcs_service *s)
c->egid = -1;
c->sock = -1;
qb_list_init(&c->list);
+ c->receive_buf = NULL;
return c;
}
@@ -214,50 +201,37 @@ static void qb_ipcs_disconnect_internal(void *data)
}
c->service->funcs.disconnect(c);
qb_ipcc_us_disconnect(c->sock);
+ if (c->receive_buf) {
+ free(c->receive_buf);
+ }
}
void qb_ipcs_disconnect(struct qb_ipcs_connection *c)
{
+ qb_util_log(LOG_DEBUG, "%s()", __func__);
if (qb_hdb_handle_destroy(&qb_ipc_connections, c->handle) != 0)
perror("qb_ipcs_disconnect:destroy");
if (qb_hdb_handle_put(&qb_ipc_connections, c->handle) != 0)
perror("qb_ipcs_disconnect:put");
}
-static int32_t _process_request_(struct qb_ipcs_service *s)
+static int32_t _process_request_(struct qb_ipcs_connection *c)
{
int32_t res = 0;
- struct qb_ipcs_connection *c = NULL;
struct qb_ipc_request_header *hdr;
- hdr = (struct qb_ipc_request_header *)s->receive_buf;
+ hdr = (struct qb_ipc_request_header *)c->receive_buf;
get_msg_with_live_connection:
- res = s->funcs.request_recv(s, hdr, s->max_msg_size);
+ res = c->service->funcs.request_recv(c, hdr, c->max_msg_size);
if (res == -EAGAIN) {
goto get_msg_with_live_connection;
}
if (res < 0) {
goto cleanup;
}
- if (qb_hdb_handle_get
- (&qb_ipc_connections, hdr->session_id, (void **)&c) != 0) {
- qb_util_log(LOG_DEBUG,
- "%s dropping message for expired connection.",
- __func__);
- goto get_msg_with_live_connection;
- }
switch (hdr->id) {
- case QB_IPC_MSG_CONNECT:
- qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_CONNECT", __func__);
- if (s->funcs.connect(s, c, hdr, hdr->size) == 0) {
- if (s->serv_fns.connection_created) {
- s->serv_fns.connection_created(c->handle);
- }
- }
- break;
-
case QB_IPC_MSG_DISCONNECT:
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func__);
qb_ipcs_disconnect(c);
@@ -266,11 +240,10 @@ get_msg_with_live_connection:
case QB_IPC_MSG_NEW_MESSAGE:
default:
- s->serv_fns.msg_process(c->handle, hdr, hdr->size);
+ c->service->serv_fns.msg_process(c->handle, hdr, hdr->size);
break;
}
cleanup:
- qb_hdb_handle_put(&qb_ipc_connections, hdr->session_id);
return res;
}
@@ -278,7 +251,7 @@ int32_t qb_ipcs_dispatch_service_request(qb_handle_t handle,
int32_t fd, int32_t revents,
void *data)
{
- return _process_request_((struct qb_ipcs_service *)data);
+ return _process_request_((struct qb_ipcs_connection *)data);
}
int32_t qb_ipcs_dispatch_connection_request(qb_handle_t handle,
@@ -297,5 +270,5 @@ int32_t qb_ipcs_dispatch_connection_request(qb_handle_t handle,
qb_ipc_us_recv(c->sock, &one_byte, 1);
}
- return _process_request_(c->service);
+ return _process_request_(c);
}
diff --git a/lib/util.c b/lib/util.c
index 469ac68..7f8737d 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -174,15 +174,21 @@ int32_t qb_util_mmap_file_open(char *path, const char *file, size_t
bytes,
if (fd < 0 && !is_absolute) {
qb_util_log(LOG_ERR, "couldn't open file %s error: %s",
path, strerror(errno));
+
snprintf(path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
fd = open_mmap_file(path, file_flags);
if (fd < 0) {
+ qb_util_log(LOG_ERR, "couldn't open file %s error: %s",
+ path, strerror(errno));
return -errno;
}
}
if (fd >= 0) {
ftruncate(fd, bytes);
+ } else {
+ qb_util_log(LOG_ERR, "couldn't open file %s error: %s",
+ path, strerror(errno));
}
return fd;
}
diff --git a/tests/bmc.c b/tests/bmc.c
index 9a14a10..8213096 100644
--- a/tests/bmc.c
+++ b/tests/bmc.c
@@ -116,6 +116,9 @@ repeat_send:
if (res == -EINTR) {
return -1;
}
+ if (res < 0) {
+ perror("qb_ipcc_recv");
+ }
assert(res == sizeof(struct qb_ipc_response_header));
assert(res_header.id == 13);
assert(res_header.size == sizeof(struct qb_ipc_response_header));
@@ -149,7 +152,7 @@ static void libqb_log_writer(const char *file_name,
int32_t file_line,
int32_t severity, const char *msg)
{
- printf("libqb: %s:%d %s\n", file_name, file_line, msg);
+ printf("libqb: %s:%d [%d] %s\n", file_name, file_line, severity, msg);
}
int32_t main(int32_t argc, char *argv[])
@@ -182,7 +185,7 @@ int32_t main(int32_t argc, char *argv[])
signal(SIGINT, sigterm_handler);
signal(SIGILL, sigterm_handler);
signal(SIGTERM, sigterm_handler);
- conn = qb_ipcc_connect("bm1");
+ conn = qb_ipcc_connect("bm1", MAX_MSG_SIZE);
if (conn == NULL) {
perror("qb_ipcc_connect");
exit(1);
diff --git a/tests/bmcpt.c b/tests/bmcpt.c
index 4a07b55..3afb1db 100644
--- a/tests/bmcpt.c
+++ b/tests/bmcpt.c
@@ -32,6 +32,7 @@
#include <signal.h>
#define ITERATIONS 10000000
+#define THREADS 4
struct bm_ctx {
qb_ipcc_connection_t *conn;
@@ -82,7 +83,7 @@ static void bm_finish(struct bm_ctx *ctx, const char *operation, int32_t
size)
static void bmc_connect(struct bm_ctx *ctx)
{
- ctx->conn = qb_ipcc_connect("bm1");
+ ctx->conn = qb_ipcc_connect("bm1", 1000 * (100 + THREADS));
}
static void bmc_disconnect(struct bm_ctx *ctx)
@@ -155,7 +156,6 @@ static void *benchmark(void *ctx)
}
}
-#define THREADS 4
int32_t main(void)
{
diff --git a/tests/bms.c b/tests/bms.c
index 8486526..0aa8c23 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -180,7 +180,7 @@ int32_t main(int32_t argc, char *argv[])
bms_poll_handle = qb_poll_create();
- s1 = qb_ipcs_create("bm1", ipc_type, 8192*64);
+ s1 = qb_ipcs_create("bm1", ipc_type);
if (s1 == 0) {
perror("qb_ipcs_create");
exit(1);
--
1.7.2.3