sanlock_reg_lockspace() returns an fd that the caller should
use in poll(2), and when ready should call sanlock_get_callback()
to read the callback.
Currently, the only callback type is host message. When a host
receives a message from another host, the host message is sent
to the callback fd.
Signed-off-by: David Teigland <teigland(a)redhat.com>
---
src/client.c | 80 +++++++++++++++++++++++++++++++++++
src/cmd.c | 31 ++++++++++++++
src/lockspace.c | 48 ++++++++++++++++++++-
src/lockspace.h | 7 ++--
src/main.c | 1 +
src/resource.c | 51 ++++++++++++++++++++++-
src/resource.h | 2 +-
src/sanlock.h | 19 +++++++++
src/sanlock_admin.h | 4 ++
src/sanlock_internal.h | 1 +
src/sanlock_sock.h | 3 ++
tests/sanlk_cb.c | 110 +++++++++++++++++++++++++++++++++++++++++++++++++
12 files changed, 350 insertions(+), 7 deletions(-)
create mode 100644 tests/sanlk_cb.c
diff --git a/src/client.c b/src/client.c
index 01d56b22706d..9e53f8b7aa2d 100644
--- a/src/client.c
+++ b/src/client.c
@@ -844,6 +844,86 @@ int sanlock_set_message(const char *ls_name, uint32_t flags,
return rv;
}
+int sanlock_reg_lockspace(struct sanlk_lockspace *ls, uint32_t flags)
+{
+ struct sm_header h;
+ int rv, fd;
+
+ if (!ls->name[0])
+ return -EINVAL;
+
+ rv = connect_socket(&fd);
+ if (rv < 0)
+ return rv;
+
+ rv = send_header(fd, SM_CMD_REG_LOCKSPACE, flags, 0, 0, 0);
+ if (rv < 0) {
+ close(fd);
+ return rv;
+ }
+
+ rv = send(fd, ls, sizeof(struct sanlk_lockspace), 0);
+ if (rv < 0) {
+ close(fd);
+ return rv;
+ }
+
+ memset(&h, 0, sizeof(struct sm_header));
+
+ rv = recv(fd, &h, sizeof(h), MSG_WAITALL);
+ if (rv < 0) {
+ rv = -errno;
+ close(fd);
+ return rv;
+ }
+
+ if (rv != sizeof(h)) {
+ rv = -1;
+ close(fd);
+ return rv;
+ }
+
+ rv = (int)h.data;
+ if (rv < 0) {
+ close(fd);
+ return rv;
+ }
+
+ return fd;
+}
+
+int sanlock_get_callback(int fd, uint32_t flags GNUC_UNUSED,
+ struct sanlk_callback *cb, int cb_size)
+{
+ struct sm_header h;
+ struct sanlk_callback callback;
+ int rv, len;
+
+ rv = recv(fd, &h, sizeof(h), MSG_WAITALL);
+ if (rv < 0)
+ return -errno;
+
+ if (rv != sizeof(h))
+ return -1;
+
+ len = h.length - sizeof(h);
+
+ if (len > sizeof(callback))
+ return -EINVAL;
+
+ memset(&callback, 0, sizeof(callback));
+
+ rv = recv(fd, &callback, len, MSG_WAITALL);
+ if (rv < 0)
+ return -errno;
+
+ if (len > cb_size)
+ len = cb_size;
+
+ memcpy(cb, &callback, len);
+ return 0;
+}
+
/* old api */
int sanlock_init(struct sanlk_lockspace *ls,
struct sanlk_resource *res,
diff --git a/src/cmd.c b/src/cmd.c
index 727d782e4764..a22fec5f8b6c 100644
--- a/src/cmd.c
+++ b/src/cmd.c
@@ -2443,6 +2443,33 @@ static void cmd_version(int ci GNUC_UNUSED, int fd, struct
sm_header *h_recv)
send(fd, h_recv, sizeof(struct sm_header), MSG_NOSIGNAL);
}
+static void cmd_reg_lockspace(int fd, struct sm_header *h_recv)
+{
+ struct sm_header h;
+ struct sanlk_lockspace lockspace;
+ int rv;
+
+ log_debug("cmd_reg_lockspace fd %d", fd);
+
+ memcpy(&h, h_recv, sizeof(struct sm_header));
+ h.version = SM_PROTO;
+ h.length = sizeof(struct sm_header);
+
+ rv = recv(fd, &lockspace, sizeof(struct sanlk_lockspace), MSG_WAITALL);
+ if (rv != sizeof(struct sanlk_lockspace)) {
+ h.data = -ENOTCONN;
+ goto out;
+ }
+
+ rv = register_lockspace_fd(&lockspace, fd);
+
+ h.data = rv;
+ h.data2 = fd;
+out:
+ log_debug("cmd_reg_lockspace fd %d rv %d", fd, rv);
+ send(fd, &h, sizeof(struct sm_header), MSG_NOSIGNAL);
+}
+
static int get_peer_pid(int fd, int *pid)
{
struct ucred cred;
@@ -2531,6 +2558,10 @@ void call_cmd_daemon(int ci, struct sm_header *h_recv, int
client_maxi)
strcpy(client[ci].owner_name, "get_hosts");
cmd_get_hosts(fd, h_recv);
break;
+ case SM_CMD_REG_LOCKSPACE:
+ strcpy(client[ci].owner_name, "reg_lockspace");
+ cmd_reg_lockspace(fd, h_recv);
+ break;
};
if (auto_close)
diff --git a/src/lockspace.c b/src/lockspace.c
index c57497fa4a03..28181a475e07 100644
--- a/src/lockspace.c
+++ b/src/lockspace.c
@@ -111,6 +111,23 @@ int lockspace_info(const char *space_name, struct space_info *spi)
return rv;
}
+int lockspace_callback_fd(int space_id)
+{
+ struct space *sp;
+ int fd = -ENOENT;
+
+ pthread_mutex_lock(&spaces_mutex);
+ list_for_each_entry(sp, &spaces, list) {
+ if (sp->space_id != space_id)
+ continue;
+ fd = sp->callback_fd;
+ break;
+ }
+ pthread_mutex_unlock(&spaces_mutex);
+
+ return fd;
+}
+
int lockspace_disk(char *space_name, struct sync_disk *disk)
{
struct space *sp;
@@ -371,7 +388,7 @@ void check_other_leases(struct space *sp, char *buf)
(unsigned long long)hs->owner_generation,
hm.msg, hm.seq);
- set_message_examine(sp->space_name, &hm,
+ set_message_examine(sp->space_id, &hm,
hs->owner_id, hs->owner_generation);
@@ -670,6 +687,9 @@ static void *lockspace_thread(void *arg_in)
if (opened)
close(sp->host_id_disk.fd);
+ if (sp->callback_fd != -1)
+ close(sp->callback_fd);
+
close_task_aio(&task);
return NULL;
}
@@ -705,6 +725,7 @@ int add_lockspace_start(struct sanlk_lockspace *ls, uint32_t
io_timeout, struct
sp->host_id_disk.fd = -1;
sp->host_id = ls->host_id;
sp->io_timeout = io_timeout;
+ sp->callback_fd = -1;
pthread_mutex_init(&sp->mutex, NULL);
pthread_mutex_lock(&spaces_mutex);
@@ -1246,6 +1267,31 @@ int set_lockspace_message(struct sanlk_lockspace *ls, struct
sanlk_host_message
return 0;
}
+int register_lockspace_fd(struct sanlk_lockspace *ls, int fd)
+{
+ struct space *sp;
+
+ if (!ls->name[0])
+ return -EINVAL;
+
+ pthread_mutex_lock(&spaces_mutex);
+ sp = _search_space(ls->name, NULL, 0, &spaces, NULL, NULL, NULL);
+ if (!sp) {
+ pthread_mutex_unlock(&spaces_mutex);
+ return -ENOENT;
+ }
+ pthread_mutex_unlock(&spaces_mutex);
+
+ pthread_mutex_lock(&sp->mutex);
+ if (sp->callback_fd != -1)
+ close(sp->callback_fd);
+ sp->callback_fd = dup(fd);
+ log_space(sp, "register cb fd %d from fd %d", sp->callback_fd, fd);
+ pthread_mutex_unlock(&sp->mutex);
+
+ return 0;
+}
+
/*
* we call stop_host_id() when all pids are gone and we're in a safe state, so
* it's safe to unlink the watchdog right away here. We want to sp the unlink
diff --git a/src/lockspace.h b/src/lockspace.h
index 0189d00fd524..2b6996ee2ff7 100644
--- a/src/lockspace.h
+++ b/src/lockspace.h
@@ -6,8 +6,8 @@
* of the GNU General Public License v2 or (at your option) any later version.
*/
-#ifndef __HOST_ID_H__
-#define __HOST_ID__H__
+#ifndef __LOCKSPACE_H__
+#define __LOCKSPACE_H__
struct space *find_lockspace(const char *name);
int _lockspace_info(const char *space_name, struct space_info *spi);
@@ -29,6 +29,7 @@ int get_lockspaces(char *buf, int *len, int *count, int maxlen);
int get_hosts(struct sanlk_lockspace *ls, char *buf, int *len, int *count, int maxlen);
void host_message_from_extra(struct sanlk_host_message *hm, struct delta_extra *extra);
int set_lockspace_message(struct sanlk_lockspace *ls, struct sanlk_host_message *hm);
-
+int register_lockspace_fd(struct sanlk_lockspace *ls, int fd);
+int lockspace_callback_fd(int space_id);
#endif
diff --git a/src/main.c b/src/main.c
index a693652a85fc..279c95214368 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1178,6 +1178,7 @@ static void process_connection(int ci)
case SM_CMD_LOG_DUMP:
case SM_CMD_GET_LOCKSPACES:
case SM_CMD_GET_HOSTS:
+ case SM_CMD_REG_LOCKSPACE:
call_cmd_daemon(ci, &h, client_maxi);
break;
case SM_CMD_ADD_LOCKSPACE:
diff --git a/src/resource.c b/src/resource.c
index 3c499b5afeda..467b72c3977b 100644
--- a/src/resource.c
+++ b/src/resource.c
@@ -22,8 +22,11 @@
#include <signal.h>
#include <sys/types.h>
#include <sys/time.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include "sanlock_internal.h"
+#include "sanlock_sock.h"
#include "diskio.h"
#include "log.h"
#include "paxos_lease.h"
@@ -1697,13 +1700,13 @@ int set_resource_examine(char *space_name, char *res_name)
struct recv_message {
struct list_head list;
+ uint32_t space_id;
uint64_t from_host_id;
uint64_t from_generation;
struct sanlk_host_message hm;
};
-void set_message_examine(char *space_name GNUC_UNUSED,
- struct sanlk_host_message *hm,
+void set_message_examine(uint32_t space_id, struct sanlk_host_message *hm,
uint64_t from_host_id, uint64_t from_generation)
{
struct recv_message *rm;
@@ -1716,6 +1719,7 @@ void set_message_examine(char *space_name GNUC_UNUSED,
memset(rm, 0, sizeof(struct recv_message));
memcpy(&rm->hm, hm, sizeof(struct sanlk_host_message));
+ rm->space_id = space_id;
rm->from_host_id = from_host_id;
rm->from_generation = from_generation;
@@ -1726,6 +1730,47 @@ void set_message_examine(char *space_name GNUC_UNUSED,
pthread_mutex_unlock(&resource_mutex);
}
+static void send_callback(struct recv_message *rm)
+{
+ struct sm_header h;
+ struct sanlk_callback_hm cb;
+ int fd, rv;
+
+ fd = lockspace_callback_fd(rm->space_id);
+ if (fd < 0)
+ return;
+
+ memset(&h, 0, sizeof(h));
+ memset(&cb, 0, sizeof(cb));
+
+ h.magic = SM_MAGIC;
+ h.version = SM_CB_PROTO;
+ h.cmd = SM_CMD_CALLBACK;
+ h.length = sizeof(h) + sizeof(cb);
+
+ cb.type = SANLK_CB_HOST_MESSAGE;
+ cb.from_host_id = rm->from_host_id;
+ cb.from_generation = rm->from_generation;
+ cb.host_id = rm->hm.host_id;
+ cb.generation = rm->hm.generation;
+ cb.msg = rm->hm.msg;
+ cb.seq = rm->hm.seq;
+
+ rv = send(fd, &h, sizeof(h), MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (rv == -EAGAIN) {
+ log_error("close callback fd %d sp %d", fd, rm->space_id);
+ close(fd);
+ return;
+ }
+
+ rv = send(fd, &cb, sizeof(cb), MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (rv == -EAGAIN) {
+ log_error("close callback fd %d sp %d", fd, rm->space_id);
+ close(fd);
+ return;
+ }
+}
+
static void resource_thread_messages(void)
{
struct recv_message *rm, *safe;
@@ -1738,6 +1783,8 @@ static void resource_thread_messages(void)
(unsigned long long)rm->from_generation,
rm->hm.msg, rm->hm.seq);
+ send_callback(rm);
+
if (rm->hm.msg & SANLK_HM_WD_RESET) {
log_error("Host reset request from host_id %llu gen %llu",
(unsigned long long)rm->from_host_id,
diff --git a/src/resource.h b/src/resource.h
index ce97884b1c90..17b6376a18b6 100644
--- a/src/resource.h
+++ b/src/resource.h
@@ -27,7 +27,7 @@ int request_token(struct task *task, struct token *token, uint32_t
force_mode,
int set_resource_examine(char *space_name, char *res_name);
-void set_message_examine(char *space_name, struct sanlk_host_message *hm,
+void set_message_examine(uint32_t space_id, struct sanlk_host_message *hm,
uint64_t from_host_id, uint64_t from_generation);
int res_set_lvb(struct sanlk_resource *res, char *lvb, int lvblen);
diff --git a/src/sanlock.h b/src/sanlock.h
index b95d010ac97b..8e662d5b4a96 100644
--- a/src/sanlock.h
+++ b/src/sanlock.h
@@ -115,6 +115,25 @@ struct sanlk_host_message {
uint32_t ack_seq;
};
+#define SANLK_CB_HOST_MESSAGE 0x00000001
+
+struct sanlk_callback_hm {
+ uint32_t type;
+ uint32_t flags;
+ uint32_t msg;
+ uint32_t seq;
+ uint64_t host_id;
+ uint64_t generation;
+ uint64_t from_host_id;
+ uint64_t from_generation;
+};
+
+struct sanlk_callback {
+ union {
+ struct sanlk_callback_hm hm;
+ };
+};
+
size_t sanlock_path_export(char *dst, const char *src, size_t dstlen);
size_t sanlock_path_import(char *dst, const char *src, size_t dstlen);
diff --git a/src/sanlock_admin.h b/src/sanlock_admin.h
index ee00e478027b..8464a7ce7b2f 100644
--- a/src/sanlock_admin.h
+++ b/src/sanlock_admin.h
@@ -32,6 +32,10 @@
#define SANLK_HOST_DEAD 0x00000005
#define SANLK_HOST_MASK 0x0000000F /* select SANLK_HOST_ from flags */
+int sanlock_reg_lockspace(struct sanlk_lockspace *ls, uint32_t flags);
+
+int sanlock_get_callback(int fd, uint32_t flags, struct sanlk_callback *cb, int
cb_size);
+
/*
* add_lockspace returns:
* 0: the lockspace has been added successfully
diff --git a/src/sanlock_internal.h b/src/sanlock_internal.h
index 414d3f841eb0..966395902508 100644
--- a/src/sanlock_internal.h
+++ b/src/sanlock_internal.h
@@ -170,6 +170,7 @@ struct space {
int external_remove;
int thread_stop;
int wd_fd;
+ int callback_fd;
pthread_t thread;
pthread_mutex_t mutex; /* protects lease_status, thread_stop */
struct lease_status lease_status;
diff --git a/src/sanlock_sock.h b/src/sanlock_sock.h
index b61023720130..42c8d45cad65 100644
--- a/src/sanlock_sock.h
+++ b/src/sanlock_sock.h
@@ -15,6 +15,7 @@
#define SM_MAGIC 0x04282010
#define SM_PROTO 0x00000001
+#define SM_CB_PROTO 0x00000001
#define MAX_CLIENT_MSG (1024 * 1024) /* TODO: this is random */
@@ -49,6 +50,8 @@ enum {
SM_CMD_VERSION = 28,
SM_CMD_SET_MESSAGE = 29,
SM_CMD_READ_LOCKSPACE_MESSAGE = 30,
+ SM_CMD_REG_LOCKSPACE = 31,
+ SM_CMD_CALLBACK = 32,
};
struct sm_header {
diff --git a/tests/sanlk_cb.c b/tests/sanlk_cb.c
new file mode 100644
index 000000000000..d2961b14344f
--- /dev/null
+++ b/tests/sanlk_cb.c
@@ -0,0 +1,110 @@
+#include <inttypes.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <fcntl.h>
+#include <string.h>
+#include <errno.h>
+#include <limits.h>
+#include <time.h>
+#include <signal.h>
+#include <poll.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include "sanlock.h"
+#include "sanlock_admin.h"
+#include "../src/sanlock_sock.h"
+
+int main(int argc, char *argv[])
+{
+ struct sanlk_lockspace ls;
+ struct sm_header h;
+ struct sanlk_callback cb;
+ struct pollfd pollfd;
+ int fd, rv;
+
+ if (argc < 2) {
+ printf("sanlk_cb <lockspace_name>\n");
+ return -1;
+ }
+
+ memset(&ls, 0, sizeof(ls));
+
+ strcpy(ls.name, argv[1]);
+
+ fd = sanlock_reg_lockspace(&ls, 0);
+ if (fd < 0) {
+ printf("reg error %d\n", fd);
+ return -1;
+ }
+
+ printf("sanlock_reg_lockspace fd %d\n", fd);
+
+ memset(&pollfd, 0, sizeof(pollfd));
+ pollfd.fd = fd;
+ pollfd.events = POLLIN;
+
+ while (1) {
+ rv = poll(&pollfd, 1, -1);
+ if (rv == -1 && errno == EINTR)
+ continue;
+
+ if (rv < 0) {
+ printf("poll error %d\n", rv);
+ exit(0);
+ }
+
+ if (pollfd.revents & POLLIN) {
+#if 0
+ rv = recv(fd, &h, sizeof(h), MSG_WAITALL);
+ if (rv < 0) {
+ printf("recv h %d %d\n", rv, errno);
+ return -errno;
+ }
+ if (rv != sizeof(h)) {
+ printf("recv rv %d\n", rv);
+ return -1;
+ }
+
+ printf("h m %x v %x c %d l %d\n",
+ h.magic, h.version, h.cmd, h.length);
+
+ rv = recv(fd, &cb, sizeof(cb), MSG_WAITALL);
+ if (rv < 0) {
+ printf("recv cb %d %d\n", rv, errno);
+ return -errno;
+ }
+ if (rv != sizeof(cb)) {
+ printf("recv rv %d\n", rv);
+ return -1;
+ }
+#endif
+
+ rv = sanlock_get_callback(fd, 0, &cb, sizeof(cb));
+
+ if (cb.hm.type != SANLK_CB_HOST_MESSAGE) {
+ printf("unknown cb type %d\n", cb.hm.type);
+ continue;
+ }
+
+ printf("host message from host_id %llu gen %llu\n",
+ (unsigned long long)cb.hm.from_host_id,
+ (unsigned long long)cb.hm.from_generation);
+ printf("msg 0x%08x seq 0x%08x\n",
+ cb.hm.msg, cb.hm.seq);
+ }
+
+ if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
+ printf("poll revents %x\n", pollfd.revents);
+ exit(0);
+ }
+ }
+
+ return 0;
+}
+
--
1.8.3.1