Signed-off-by: Angus Salkeld asalkeld@redhat.com --- examples/.gitignore | 2 + examples/Makefile.am | 12 ++- examples/ipcclient.c | 83 +++++++++++++ examples/ipcserver.c | 332 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 428 insertions(+), 1 deletions(-) create mode 100644 examples/ipcclient.c create mode 100644 examples/ipcserver.c
diff --git a/examples/.gitignore b/examples/.gitignore index 861a32c..bf73ae4 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -1,3 +1,5 @@ simplelog tcpclient tcpserver +ipcclient +ipcserver diff --git a/examples/Makefile.am b/examples/Makefile.am index 867f9d8..f7dee65 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -21,7 +21,7 @@ MAINTAINERCLEANFILES = Makefiles.in EXTRA_DIST = CLEANFILES =
-noinst_PROGRAMS = simplelog tcpclient tcpserver +noinst_PROGRAMS = simplelog tcpclient tcpserver ipcclient ipcserver
simplelog_SOURCES = simplelog.c $(top_builddir)/include/qb/qblog.h simplelog_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include @@ -35,3 +35,13 @@ tcpserver_SOURCES = tcpserver.c $(top_builddir)/include/qb/qbloop.h tcpserver_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include tcpserver_LDADD = -lrt $(top_builddir)/lib/libqb.la
+ipcclient_SOURCES = ipcclient.c $(top_builddir)/include/qb/qbloop.h \ + $(top_builddir)/include/qb/qbipcc.h +ipcclient_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include +ipcclient_LDADD = -lrt $(top_builddir)/lib/libqb.la + +ipcserver_SOURCES = ipcserver.c $(top_builddir)/include/qb/qbloop.h \ + $(top_builddir)/include/qb/qbipcs.h +ipcserver_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include \ + $(GLIB_CFLAGS) +ipcserver_LDADD = -lrt $(top_builddir)/lib/libqb.la $(GLIB_LIBS) diff --git a/examples/ipcclient.c b/examples/ipcclient.c new file mode 100644 index 0000000..c43998f --- /dev/null +++ b/examples/ipcclient.c @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2011 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Angus Salkeld asalkeld@redhat.com + * + * libqb is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 2.1 of the License, or + * (at your option) any later version. + * + * libqb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with libqb. If not, see http://www.gnu.org/licenses/. + */ +#include "os_base.h" +#include <signal.h> + +#include <qb/qbdefs.h> +#include <qb/qbutil.h> +#include <qb/qbipcc.h> + +#define MAX_MSG_SIZE (8192) + +int +main(int argc, char *argv[]) +{ + qb_ipcc_connection_t *conn; + int32_t res; + char *buffer[MAX_MSG_SIZE]; + + conn = qb_ipcc_connect("ipcserver", MAX_MSG_SIZE); + if (conn == NULL) { + perror("qb_ipcc_connect"); + exit(1); + } + + while(1) { + struct qb_ipc_request_header *req_header = (struct qb_ipc_request_header *)buffer; + struct qb_ipc_response_header *res_header = (struct qb_ipc_response_header *)buffer; + char *data = (char*)buffer + sizeof(struct qb_ipc_request_header); + + printf("SEND (q or Q to quit) : "); + if (gets(data) == NULL) { + continue; + } + + if (strcmp(data , "q") != 0 && + strcmp(data , "Q") != 0) { + req_header->id = QB_IPC_MSG_USER_START + 3; + req_header->size = sizeof(struct qb_ipc_request_header) + strlen(data) + 1; + res = qb_ipcc_send(conn, req_header, req_header->size); + if (res < 0) { + perror("qb_ipcc_send"); + } + } else { + break; + } + + if (res > 0) { + res = qb_ipcc_recv(conn, + buffer, + MAX_MSG_SIZE, -1); + if (res < 0) { + perror("qb_ipcc_recv"); + } + res_header = (struct qb_ipc_response_header*)buffer; + data = (char*)buffer + sizeof(struct qb_ipc_response_header); + data[res - sizeof(struct qb_ipc_response_header)] = '\0'; + + printf("Response[%d]: %s \n", res_header->id, data); + } + } + + qb_ipcc_disconnect(conn); + return EXIT_SUCCESS; +} + diff --git a/examples/ipcserver.c b/examples/ipcserver.c new file mode 100644 index 0000000..f26143a --- /dev/null +++ b/examples/ipcserver.c @@ -0,0 +1,332 @@ +/* + * Copyright (c) 2006-2009 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Steven Dake sdake@redhat.com + * + * libqb is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 2.1 of the License, or + * (at your option) any later version. + * + * libqb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with libqb. If not, see http://www.gnu.org/licenses/. + */ +#include "os_base.h" +#include <signal.h> + +#include <qb/qbdefs.h> +#include <qb/qbutil.h> +#include <qb/qblog.h> +#include <qb/qbloop.h> +#include <qb/qbipcs.h> + +#ifdef HAVE_GLIB +#include <glib.h> +static GMainLoop *glib_loop; +static qb_array_t *gio_map; +#endif /* HAVE_GLIB */ + +static int32_t use_glib = QB_FALSE; +static qb_loop_t *bms_loop; +static qb_ipcs_service_t* s1; + +static int32_t +s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) +{ +#if 0 + if (uid == 0 && gid == 0) { + qb_log(LOG_INFO, "Authenticated connection"); + return 1; + } + qb_log(LOG_NOTICE, "BAD user!"); + return 0; +#else + return 0; +#endif +} + +static void +s1_connection_created_fn(qb_ipcs_connection_t *c) +{ + struct qb_ipcs_stats srv_stats; + + qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); + qb_log(LOG_INFO, "Connection created (active:%d, closed:%d)", + srv_stats.active_connections, + srv_stats.closed_connections); +} + +static void +s1_connection_destroyed_fn(qb_ipcs_connection_t *c) +{ + qb_log(LOG_INFO, "Connection about to be freed"); +} + +static int32_t +s1_connection_closed_fn(qb_ipcs_connection_t *c) +{ + struct qb_ipcs_connection_stats stats; + struct qb_ipcs_stats srv_stats; + + qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE); + qb_ipcs_connection_stats_get(c, &stats, QB_FALSE); + qb_log(LOG_INFO, "Connection to pid:%d destroyed (active:%d, closed:%d)", + stats.client_pid, + srv_stats.active_connections, + srv_stats.closed_connections); + + qb_log(LOG_DEBUG, " Requests %"PRIu64"", stats.requests); + qb_log(LOG_DEBUG, " Responses %"PRIu64"", stats.responses); + qb_log(LOG_DEBUG, " Events %"PRIu64"", stats.events); + qb_log(LOG_DEBUG, " Send retries %"PRIu64"", stats.send_retries); + qb_log(LOG_DEBUG, " Recv retries %"PRIu64"", stats.recv_retries); + qb_log(LOG_DEBUG, " FC state %d", stats.flow_control_state); + qb_log(LOG_DEBUG, " FC count %"PRIu64"", stats.flow_control_count); + return 0; +} + +static int32_t +s1_msg_process_fn(qb_ipcs_connection_t *c, + void *data, size_t size) +{ + struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data; + struct qb_ipc_response_header response; + ssize_t res; + struct iovec iov[2]; + char resp[100]; + + qb_log(LOG_DEBUG, "msg received (id:%d, size:%d)", + req_pt->id, req_pt->size); + response.size = sizeof(struct qb_ipc_response_header); + response.id = 13; + response.error = 0; + + snprintf(resp, 100, "ACK %zd bytes", size); + iov[0].iov_len = sizeof(response); + iov[0].iov_base = &response; + iov[1].iov_len = strlen(resp); + iov[1].iov_base = resp; + + res = qb_ipcs_response_sendv(c, iov, 2); + if (res < 0) { + qb_perror(LOG_ERR, "qb_ipcs_response_send"); + } + return 0; +} + +static void +sigusr1_handler(int32_t num) +{ + qb_log(LOG_DEBUG, "(%d)", num); + qb_ipcs_destroy(s1); + exit(0); +} + +static void +show_usage(const char *name) +{ + printf("usage: \n"); + printf("%s <options>\n", name); + printf("\n"); + printf(" options:\n"); + printf("\n"); + printf(" -h show this help text\n"); + printf(" -m use shared memory\n"); + printf(" -p use posix message queues\n"); + printf(" -s use sysv message queues\n"); + printf(" -u use unix sockets\n"); + printf(" -g use glib mainloop\n"); + printf("\n"); +} + +#ifdef HAVE_GLIB +struct gio_to_qb_poll { + int32_t is_used; + GIOChannel *channel; + int32_t events; + void * data; + qb_ipcs_dispatch_fn_t fn; + enum qb_loop_priority p; +}; + +static gboolean +gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data) +{ + struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; + gint fd = g_io_channel_unix_get_fd(gio); + + return (adaptor->fn(fd, condition, adaptor->data) == 0); +} + +static int32_t +my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + struct gio_to_qb_poll *adaptor; + GIOChannel *channel; + int32_t res = 0; + + res = qb_array_grow(gio_map, fd + 1); + if (res < 0) { + return res; + } + res = qb_array_index(gio_map, fd, (void**)&adaptor); + if (res < 0) { + return res; + } + if (adaptor->is_used) { + return -EEXIST; + } + + channel = g_io_channel_unix_new(fd); + if (!channel) { + return -ENOMEM; + } + + adaptor->channel = channel; + adaptor->fn = fn; + adaptor->events = evts; + adaptor->data = data; + adaptor->p = p; + adaptor->is_used = QB_TRUE; + + g_io_add_watch(channel, evts, gio_read_socket, adaptor); + return 0; +} + +static int32_t +my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return 0; +} + +static int32_t +my_g_dispatch_del(int32_t fd) +{ + struct gio_to_qb_poll *adaptor; + if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) { + g_io_channel_unref(adaptor->channel); + adaptor->is_used = QB_FALSE; + } + return 0; +} +#endif /* HAVE_GLIB */ + +static int32_t +my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn) +{ + return qb_loop_job_add(bms_loop, p, data, fn); +} + +static int32_t +my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return qb_loop_poll_add(bms_loop, p, fd, evts, data, fn); +} + +static int32_t +my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn); +} + +static int32_t +my_dispatch_del(int32_t fd) +{ + return qb_loop_poll_del(bms_loop, fd); +} + +int32_t +main(int32_t argc, char *argv[]) +{ + const char *options = "mpsugh"; + int32_t opt; + enum qb_ipc_type ipc_type = QB_IPC_SHM; + struct qb_ipcs_service_handlers sh = { + .connection_accept = s1_connection_accept_fn, + .connection_created = s1_connection_created_fn, + .msg_process = s1_msg_process_fn, + .connection_destroyed = s1_connection_destroyed_fn, + .connection_closed = s1_connection_closed_fn, + }; + struct qb_ipcs_poll_handlers ph = { + .job_add = my_job_add, + .dispatch_add = my_dispatch_add, + .dispatch_mod = my_dispatch_mod, + .dispatch_del = my_dispatch_del, + }; +#ifdef HAVE_GLIB + struct qb_ipcs_poll_handlers glib_ph = { + .job_add = NULL, /* FIXME */ + .dispatch_add = my_g_dispatch_add, + .dispatch_mod = my_g_dispatch_mod, + .dispatch_del = my_g_dispatch_del, + }; +#endif /* HAVE_GLIB */ + + while ((opt = getopt(argc, argv, options)) != -1) { + switch (opt) { + case 'm': + ipc_type = QB_IPC_SHM; + break; + case 's': + ipc_type = QB_IPC_SYSV_MQ; + break; + case 'u': + ipc_type = QB_IPC_SOCKET; + break; + case 'p': + ipc_type = QB_IPC_POSIX_MQ; + break; + case 'g': + use_glib = QB_TRUE; + break; + case 'h': + default: + show_usage(argv[0]); + exit(0); + break; + } + } + signal(SIGINT, sigusr1_handler); + + qb_log_init("ipcserver", LOG_USER, LOG_WARNING); + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FILE, __FILE__, + LOG_DEBUG); + qb_log_format_set(QB_LOG_STDERR, "%f:%l [%p] %b"); + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); + + s1 = qb_ipcs_create("ipcserver", 0, ipc_type, &sh); + if (s1 == 0) { + qb_perror(LOG_ERR, "qb_ipcs_create"); + exit(1); + } + if (!use_glib) { + bms_loop = qb_loop_create(); + qb_ipcs_poll_handlers_set(s1, &ph); + qb_ipcs_run(s1); + qb_loop_run(bms_loop); + } else { +#ifdef HAVE_GLIB + glib_loop = g_main_loop_new(NULL, FALSE); + gio_map = qb_array_create(64, sizeof(struct gio_to_qb_poll)); + qb_ipcs_poll_handlers_set(s1, &glib_ph); + qb_ipcs_run(s1); + g_main_loop_run(glib_loop); +#else + qb_log(LOG_ERR, "You don't seem to have glib-devel installed.\n"); +#endif + } + return EXIT_SUCCESS; +}