Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
lib/ipc_int.h | 1 -
lib/ipcc.c | 74 ++++++++++++------------------------
lib/ipcs.c | 116 ++++++++++++++++++---------------------------------------
tests/bmc.c | 38 ++++++++++++++++--
tests/bms.c | 28 ++++++++++++--
5 files changed, 118 insertions(+), 139 deletions(-)
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 446ecbc..41b9dcc 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -58,7 +58,6 @@ struct control_buffer {
unsigned int read;
unsigned int write;
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_t sem0;
sem_t sem1;
sem_t sem2;
#endif
diff --git a/lib/ipcc.c b/lib/ipcc.c
index f082f5c..259f4be 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -27,6 +27,7 @@
#include <qb/qbipcc.h>
#include <qb/qbhdb.h>
+#include <qb/qbrb.h>
#include "ipc_int.h"
#include "util_int.h"
@@ -48,7 +49,7 @@ struct ipc_instance {
#endif
int flow_control_state;
struct control_buffer *control_buffer;
- char *request_buffer;
+ qb_ringbuffer_t * request_rb;
char *response_buffer;
char *dispatch_buffer;
size_t control_size;
@@ -313,7 +314,7 @@ void ipc_hdb_destructor(void *context)
* << 1 (or multiplied by 2) because this is a wrapped memory buffer
*/
memory_unmap(ipc_instance->control_buffer, ipc_instance->control_size);
- memory_unmap(ipc_instance->request_buffer, ipc_instance->request_size);
+ qb_rb_close(ipc_instance->request_rb);
memory_unmap(ipc_instance->response_buffer,
ipc_instance->response_size);
memory_unmap(ipc_instance->dispatch_buffer,
@@ -375,50 +376,27 @@ static int32_t
msg_send(struct ipc_instance *ipc_instance,
const struct iovec *iov, unsigned int iov_len)
{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
-
- int i;
- int res;
- int req_buffer_idx = 0;
+ int32_t i;
+ size_t size = 0;
+ char *chunk_pt;
for (i = 0; i < iov_len; i++) {
- if ((req_buffer_idx + iov[i].iov_len) >
- ipc_instance->request_size) {
- return (EINVAL);
+ if ((size + iov[i].iov_len) > ipc_instance->request_size) {
+ errno = EINVAL;
+ return -1;
}
- memcpy(&ipc_instance->request_buffer[req_buffer_idx],
- iov[i].iov_base, iov[i].iov_len);
- req_buffer_idx += iov[i].iov_len;
+ size += iov[i].iov_len;
}
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post(&ipc_instance->control_buffer->sem0);
- if (res == -1) {
- return EBADE;
+ chunk_pt = qb_rb_chunk_alloc(ipc_instance->request_rb, size);
+ if (chunk_pt == NULL) {
+ errno = ENOMEM;
+ return -1;
}
-#else
- /*
- * Signal semaphore #0 indicting a new message from client
- * to server request queue
- */
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop(ipc_instance->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- return (EAGAIN);
- } else if (res == -1 && errno == EACCES) {
- priv_change_send(ipc_instance);
- goto retry_semop;
- } else if (res == -1) {
- return (EBADE);
+ for (i = 0; i < iov_len; i++) {
+ memcpy(chunk_pt, iov[i].iov_base, iov[i].iov_len);
+ chunk_pt += iov[i].iov_len;
}
-#endif
- return 0;
+ return qb_rb_chunk_commit(ipc_instance->request_rb, size);
}
static int32_t ipc_sem_wait(struct ipc_instance *ipc_instance, int sem_num)
@@ -434,9 +412,6 @@ static int32_t ipc_sem_wait(struct ipc_instance *ipc_instance, int
sem_num)
#if _POSIX_THREAD_PROCESS_SHARED > 0
switch (sem_num) {
- case 0:
- sem = &ipc_instance->control_buffer->sem0;
- break;
case 1:
sem = &ipc_instance->control_buffer->sem1;
break;
@@ -608,13 +583,15 @@ qb_ipcc_service_connect(const char *socket_name,
goto error_connect;
}
- res = memory_map(request_map_path,
- "request_buffer-XXXXXX",
- (void *)&ipc_instance->request_buffer, request_size);
- if (res == -1) {
+ /* RB request */
+ ipc_instance->request_rb = qb_rb_open("request_ringbuffer-XXXXXX",
+ request_size,
+ QB_RB_FLAG_CREATE|QB_RB_FLAG_SHARED_PROCESS);
+ if (ipc_instance->request_rb == NULL) {
res = EBADE;
goto error_request_buffer;
}
+ strcpy(request_map_path, qb_rb_name_get(ipc_instance->request_rb));
res = memory_map(response_map_path,
"response_buffer-XXXXXX",
@@ -633,7 +610,6 @@ qb_ipcc_service_connect(const char *socket_name,
goto error_dispatch_buffer;
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_init(&ipc_instance->control_buffer->sem0, 1, 0);
sem_init(&ipc_instance->control_buffer->sem1, 1, 0);
sem_init(&ipc_instance->control_buffer->sem2, 1, 0);
#else
@@ -728,7 +704,7 @@ error_exit:
error_dispatch_buffer:
memory_unmap(ipc_instance->response_buffer, response_size);
error_response_buffer:
- memory_unmap(ipc_instance->request_buffer, request_size);
+ qb_rb_close(ipc_instance->request_rb);
error_request_buffer:
memory_unmap(ipc_instance->control_buffer, 8192);
error_connect:
diff --git a/lib/ipcs.c b/lib/ipcs.c
index ad99f1f..8f9cf8f 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -36,6 +36,7 @@
#include <qb/qblist.h>
#include <qb/qbhdb.h>
#include <qb/qbipcs.h>
+#include <qb/qbrb.h>
#include "ipc_int.h"
#include "util_int.h"
@@ -110,7 +111,7 @@ struct conn_info {
unsigned int pending_semops;
pthread_mutex_t mutex;
struct control_buffer *control_buffer;
- char *request_buffer;
+ qb_ringbuffer_t *request_rb;
char *response_buffer;
char *dispatch_buffer;
size_t control_size;
@@ -158,34 +159,12 @@ static void dummy_stats_increment_value(qb_hdb_handle_t handle,
{
}
+/*
+ * Just rite some junk to the ringbuffer to kick it out of a sem_wait
+ */
static void sem_post_exit_thread(struct conn_info *conn_info)
{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semop:
- res = sem_post(&conn_info->control_buffer->sem0);
- if (res == -1 && errno == EINTR) {
- api->stats_increment_value(conn_info->stats_handle,
- "sem_retry_count");
- goto retry_semop;
- }
-#else
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop(conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value(conn_info->stats_handle,
- "sem_retry_count");
- goto retry_semop;
- }
-#endif
+ qb_rb_chunk_write(conn_info->request_rb, conn_info, 4);
}
static int memory_map(const char *path, size_t bytes, void **buf)
@@ -459,7 +438,6 @@ static inline int conn_info_destroy(struct conn_info *conn_info)
pthread_mutex_unlock(&conn_info->mutex);
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_destroy(&conn_info->control_buffer->sem0);
sem_destroy(&conn_info->control_buffer->sem1);
sem_destroy(&conn_info->control_buffer->sem2);
#else
@@ -470,8 +448,7 @@ static inline int conn_info_destroy(struct conn_info *conn_info)
*/
res =
munmap((void *)conn_info->control_buffer, conn_info->control_size);
- res =
- munmap((void *)conn_info->request_buffer, conn_info->request_size);
+ qb_rb_close(conn_info->request_rb);
res =
munmap((void *)conn_info->response_buffer,
conn_info->response_size);
@@ -513,14 +490,14 @@ static void *serveraddr2void(uint64_t server_addr)
return (u.server_ptr);
};
-static inline void zerocopy_operations_process(struct conn_info *conn_info,
- qb_ipc_request_header_t **
- header_out,
- unsigned int *new_message)
+static void zerocopy_operations_process(struct conn_info *conn_info,
+ qb_ipc_request_header_t **
+ header_out,
+ unsigned int *new_message)
{
qb_ipc_request_header_t *header;
- header = (qb_ipc_request_header_t *) conn_info->request_buffer;
+ header = *header_out;
if (header->id == ZC_ALLOC_HEADER) {
mar_req_qb_ipcc_zc_alloc_t *hdr =
(mar_req_qb_ipcc_zc_alloc_t *) header;
@@ -572,14 +549,11 @@ static inline void zerocopy_operations_process(struct conn_info
*conn_info,
static void *pthread_ipc_consumer(void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
qb_ipc_request_header_t *header;
- qb_ipc_response_header_t qb_ipc_response_header;
- int send_ok;
- unsigned int new_message;
+ qb_ipc_response_header_t response_header;
+ int32_t send_ok;
+ uint32_t new_message;
+ ssize_t size;
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
if (api->sched_policy != 0) {
@@ -590,44 +564,25 @@ static void *pthread_ipc_consumer(void *conn)
#endif
for (;;) {
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semwait:
- res = sem_wait(&conn_info->control_buffer->sem0);
- if (ipc_thread_active(conn_info) == 0) {
- qb_ipcs_refcount_dec(conn_info);
- pthread_exit(0);
- }
- if ((res == -1) && (errno == EINTR)) {
- api->stats_increment_value(conn_info->stats_handle,
- "sem_retry_count");
- goto retry_semwait;
- }
-#else
+ size = qb_rb_chunk_peek(conn_info->request_rb, (void**)&header, 2000);
- sop.sem_num = 0;
- sop.sem_op = -1;
- sop.sem_flg = 0;
-retry_semop:
- res = semop(conn_info->semid, &sop, 1);
if (ipc_thread_active(conn_info) == 0) {
+ qb_util_log(LOG_DEBUG,"thread not active");
qb_ipcs_refcount_dec(conn_info);
pthread_exit(0);
}
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+ if (size <= 0) {
api->stats_increment_value(conn_info->stats_handle,
"sem_retry_count");
- goto retry_semop;
- } else if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- qb_ipcs_refcount_dec(conn_info);
- pthread_exit(0);
+ continue;
}
-#endif
zerocopy_operations_process(conn_info, &header, &new_message);
/*
* There is no new message to process, continue for loop
*/
if (new_message == 0) {
+ qb_rb_chunk_reclaim(conn_info->request_rb);
continue;
}
@@ -643,33 +598,36 @@ retry_semop:
* parameter, such as an invalid size
*/
if (send_ok == -1) {
- qb_ipc_response_header.size =
+ response_header.size =
sizeof(qb_ipc_response_header_t);
- qb_ipc_response_header.id = 0;
- qb_ipc_response_header.error = EINVAL;
+ response_header.id = 0;
+ response_header.error = EINVAL;
qb_ipcs_response_send(conn_info,
- &qb_ipc_response_header,
+ &response_header,
sizeof(qb_ipc_response_header_t));
+ qb_rb_chunk_reclaim(conn_info->request_rb);
} else if (send_ok) {
api->serialize_lock();
api->stats_increment_value(conn_info->stats_handle,
"requests");
api->handler_fn_get(conn_info->service,
header->id) (conn_info, header);
+ qb_rb_chunk_reclaim(conn_info->request_rb);
api->serialize_unlock();
} else {
/*
- * Overload, tell library to retry
+ * Overload, don't call qb_rb_chunk_reclaim()
*/
api->stats_increment_value(conn_info->stats_handle,
"sem_retry_count");
- qb_ipc_response_header.size =
+ response_header.size =
sizeof(qb_ipc_response_header_t);
- qb_ipc_response_header.id = 0;
- qb_ipc_response_header.error = EAGAIN;
+ response_header.id = 0;
+ response_header.error = EAGAIN;
qb_ipcs_response_send(conn_info,
- &qb_ipc_response_header,
+ &response_header,
sizeof(qb_ipc_response_header_t));
+ qb_rb_chunk_reclaim(conn_info->request_rb);
}
api->
@@ -993,7 +951,6 @@ void qb_ipcs_ipc_exit(void)
ipc_disconnect(conn_info);
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_destroy(&conn_info->control_buffer->sem0);
sem_destroy(&conn_info->control_buffer->sem1);
sem_destroy(&conn_info->control_buffer->sem2);
#else
@@ -1005,8 +962,7 @@ void qb_ipcs_ipc_exit(void)
*/
res = munmap((void *)conn_info->control_buffer,
conn_info->control_size);
- res = munmap((void *)conn_info->request_buffer,
- conn_info->request_size);
+ qb_rb_close(conn_info->request_rb);
res = munmap((void *)conn_info->response_buffer,
conn_info->response_size);
res = circular_memory_unmap(conn_info->dispatch_buffer,
@@ -1624,9 +1580,9 @@ int qb_ipcs_handler_dispatch(int fd, int revent, void *context)
(void *)&conn_info->control_buffer);
conn_info->control_size = req_setup->control_size;
- res = memory_map(req_setup->request_file,
+ conn_info->request_rb = qb_rb_open(req_setup->request_file,
req_setup->request_size,
- (void *)&conn_info->request_buffer);
+ QB_RB_FLAG_SHARED_PROCESS);
conn_info->request_size = req_setup->request_size;
res = memory_map(req_setup->response_file,
diff --git a/tests/bmc.c b/tests/bmc.c
index 8ba1a4b..93d2db7 100644
--- a/tests/bmc.c
+++ b/tests/bmc.c
@@ -28,6 +28,7 @@
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
+#include <signal.h>
int blocking = 1;
int verbose = 0;
@@ -118,8 +119,13 @@ repeat_send:
} else {
res = qb_ipcc_msg_send(bmc_ipc_handle, iov, 2);
}
- if (res != 0) {
- goto repeat_send;
+ if (res == -1) {
+ if (errno == ENOMEM) {
+ goto repeat_send;
+ } else {
+ printf("qb_ipcc_msg_send: %d(%s)\n", res, strerror(res));
+ goto repeat_send;
+ }
}
}
@@ -138,11 +144,28 @@ static void show_usage(const char *name)
printf("\n");
}
+static void sigterm_handler(int num)
+{
+ printf("writer: %s(%d)\n", __func__, num);
+ qb_ipcc_service_disconnect(bmc_ipc_handle);
+ exit(0);
+}
+
+static void libqb_log_writer(const char *file_name,
+ int32_t file_line,
+ int32_t severity, const char *msg)
+{
+ printf("libqb: %s:%d %s\n", file_name, file_line, msg);
+}
+
int main(int argc, char *argv[])
{
const char *options = "nvh";
int opt;
int i, j;
+ size_t size;
+
+ qb_util_set_log_function(libqb_log_writer);
while ((opt = getopt(argc, argv, options)) != -1) {
switch (opt) {
@@ -160,17 +183,22 @@ int main(int argc, char *argv[])
}
}
+ signal(SIGINT, sigterm_handler);
+ signal(SIGILL, sigterm_handler);
+ signal(SIGTERM, sigterm_handler);
bmc_connect();
ops_fp = fopen("opsec", "w");
mbs_fp = fopen("mbsec", "w");
- for (j = 1; j < 499; j++) {
+ for (j = 1; j < 49; j++) {
+ size = 10 * j * j;
bm_start();
for (i = 0; i < ITERATIONS; i++) {
- bmc_send_nozc(1000 * j);
+ bmc_send_nozc(size);
}
- bm_finish("send_nozc", 1000 * j);
+ bm_finish("send_nozc", size);
}
+ qb_ipcc_service_disconnect(bmc_ipc_handle);
return EXIT_SUCCESS;
}
diff --git a/tests/bms.c b/tests/bms.c
index 7fa8ccc..7a775e3 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -66,7 +66,13 @@ static void bms_benchmark_one_fn(void *conn, const void *msg)
{
qb_ipc_response_header_t res;
- if (blocking) {
+ if (verbose) {
+ printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
+ }
+ res.size = sizeof(qb_ipc_response_header_t);
+ res.id = 0;
+ res.error = 0;
+ if (blocking == 1) {
qb_ipcs_response_send(conn, &res, sizeof(res));
}
}
@@ -80,6 +86,9 @@ static void bms_benchmark_two_fn(void *conn, const void *msg)
qb_ipc_response_header_t res;
struct iovec iovec[2];
+ if (verbose) {
+ printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
+ }
res.size =
req->size - sizeof(qb_ipc_request_header_t) +
@@ -98,11 +107,17 @@ static void bms_benchmark_two_fn(void *conn, const void *msg)
static int bms_lib_init_fn(void *conn)
{
+ if (verbose) {
+ printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
+ }
return (0);
}
static int bms_lib_exit_fn(void *conn)
{
+ if (verbose) {
+ printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
+ }
return (0);
}
@@ -110,7 +125,7 @@ static struct lib_handler bms_lib_engine_one[] = {
{ /* entry 0 */
.lib_handler_fn = bms_benchmark_one_fn,
},
- { /* entry 0 */
+ { /* entry 1 */
.lib_handler_fn = bms_benchmark_two_fn,
}
};
@@ -188,7 +203,7 @@ static void bms_sending_allowed_release(void
*sending_allowed_private_data)
static void ipc_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
- fprintf(stderr, "%s:%d [%d] %s", file_name, file_line, severity, msg);
+ fprintf(stderr, "%s:%d [%d] %s\n", file_name, file_line, severity, msg);
}
static void ipc_fatal_error(const char *error_msg)
@@ -267,6 +282,9 @@ struct qb_ipcs_init_state ipc_init_state = {
static void sigusr1_handler(int num)
{
+ printf("%s(%d)\n", __func__, num);
+ qb_ipcs_ipc_exit();
+ exit(0);
}
static void show_usage(const char *name)
@@ -302,7 +320,9 @@ int main(int argc, char *argv[])
break;
}
}
- signal(SIGUSR1, sigusr1_handler);
+ signal(SIGINT, sigusr1_handler);
+ signal(SIGILL, sigusr1_handler);
+ signal(SIGTERM, sigusr1_handler);
qb_util_set_log_function(ipc_log_fn);
--
1.6.6.1