ldap/servers/slapd/back-ldbm/dblayer.c | 190 +++++++++++++++++++++----
ldap/servers/slapd/back-ldbm/ldbm_config.c | 2
ldap/servers/slapd/back-ldbm/ldbm_config.h | 2
ldap/servers/slapd/back-ldbm/proto-back-ldbm.h | 4
4 files changed, 171 insertions(+), 27 deletions(-)
New commits:
commit 37c531d17ce6dfe1ffd7b42adafc03960def3a40
Author: Ludwig Krispenz <lkrispen(a)redhat.com>
Date: Tue May 14 11:47:12 2013 +0200
Ticket 568 - make usage of transaction batch flush durable
Bug Description: if a transaction batch value >1 is specified
in a txn commit the txn log is ionly flushed if the batch
limit is reached. This can increase performance, but since
the operation is acknowleded to the client, a crash before
flushing the log would violate the durability condition.
Fix Description: to allow txn log flushing in batches, if batch-limit > 0
the flushing is done in the logflush thread. In a txn commit
a worker thread adds its transaction to the list of txns to flush,
eventually notifies the logflush thread and waits until logflushthread
confirms that the txn was flushed. only then the worker thread
continues and acknowledges success to the client.
There needs to be care that the txn logs are flushed frequently by the
logflush thread, even if there are less than the batch limit of txns
are waiting to be flushed.
one condition is base on active txns vs waiting txns, if there are no more
outstanding txns do flush.
the other condition is tiome based on two configurable timers:
- the logflush thread sleeps on a condition variable until it gets notified
by a worker thread that the batch limit is exceeded or the interval has passed
- if it awakes it checks when the last flush was done and either if it is longer
than th esecond timeer or the batch limit is reached the txns are flushed
The fix provided here makes the use of batch vals safe, but to get a performance
benefit further changes are required. Right now the txn is started and committed
inside dblayer_lock|unlock_backend, so txns cannot accumulate to be flushed.
So the order of txn and backend locking needs to be reveresed, or eventually the
code in the worker thread handling the synchronization could be move close to
sending the result to the client. But this optimization question ahs been moved
to a seperate ticket: 47358
The fix includes a minor change for the trickle thread, it is only started if
trickling
is required (trickle percentage > 0).
https://fedorahosted.org/389/ticket/568
Reviewed by: Noriko, Thanks
diff --git a/ldap/servers/slapd/back-ldbm/dblayer.c
b/ldap/servers/slapd/back-ldbm/dblayer.c
index e272cfa..2d5f98f 100644
--- a/ldap/servers/slapd/back-ldbm/dblayer.c
+++ b/ldap/servers/slapd/back-ldbm/dblayer.c
@@ -219,9 +219,16 @@ static int dblayer_start_checkpoint_thread(struct ldbminfo *li);
static int dblayer_start_trickle_thread(struct ldbminfo *li);
static int dblayer_start_perf_thread(struct ldbminfo *li);
static int dblayer_start_txn_test_thread(struct ldbminfo *li);
-static int trans_batch_count=1;
+static int trans_batch_count=0;
static int trans_batch_limit=0;
+static int trans_batch_txn_min_sleep = 50; /* ms */
+static int trans_batch_txn_max_sleep = 50;
static PRBool log_flush_thread=PR_FALSE;
+static int txn_in_progress_count = 0;
+static int *txn_log_flush_pending = NULL;
+static PRLock *sync_txn_log_flush = NULL;
+static PRCondVar *sync_txn_log_flush_done = NULL;
+static PRCondVar *sync_txn_log_do_flush = NULL;
static int dblayer_db_remove_ex(dblayer_private_env *env, char const path[], char const
dbName[], PRBool use_lock);
static void dblayer_init_pvt_txn();
static void dblayer_push_pvt_txn(back_txn *txn);
@@ -342,12 +349,59 @@ dblayer_set_batch_transactions(void *arg, void *value, char
*errorbuf, int phase
}
return retval;
}
+int
+dblayer_set_batch_txn_min_sleep(void *arg, void *value, char *errorbuf, int phase, int
apply) {
+ int val = (int)((uintptr_t)value);
+ int retval = LDAP_SUCCESS;
+
+ if (apply) {
+ if(phase == CONFIG_PHASE_STARTUP) {
+ trans_batch_txn_min_sleep=val;
+ } else if(trans_batch_txn_min_sleep != FLUSH_REMOTEOFF ) {
+ if((val == 0) && (log_flush_thread)) {
+ log_flush_thread=PR_FALSE;
+ trans_batch_txn_min_sleep = FLUSH_REMOTEOFF;
+ } else if(val > 0) {
+ trans_batch_txn_min_sleep=val;
+ }
+ }
+ }
+ return retval;
+}
+int
+dblayer_set_batch_txn_max_sleep(void *arg, void *value, char *errorbuf, int phase, int
apply) {
+ int val = (int)((uintptr_t)value);
+ int retval = LDAP_SUCCESS;
+
+ if (apply) {
+ if(phase == CONFIG_PHASE_STARTUP) {
+ trans_batch_txn_max_sleep=val;
+ } else if(trans_batch_txn_max_sleep != FLUSH_REMOTEOFF ) {
+ if((val == 0) && (log_flush_thread)) {
+ log_flush_thread=PR_FALSE;
+ trans_batch_txn_max_sleep = FLUSH_REMOTEOFF;
+ } else if(val > 0) {
+ trans_batch_txn_max_sleep=val;
+ }
+ }
+ }
+ return retval;
+}
void *
dblayer_get_batch_transactions(void *arg) {
return (void *)((uintptr_t)trans_batch_limit);
}
+void *
+dblayer_get_batch_txn_min_sleep(void *arg) {
+ return (void *)((uintptr_t)trans_batch_txn_min_sleep);
+}
+
+void *
+dblayer_get_batch_txn_max_sleep(void *arg) {
+ return (void *)((uintptr_t)trans_batch_txn_max_sleep);
+}
/*
Threading: dblayer isolates upper layers from threading considerations
@@ -3507,10 +3561,17 @@ dblayer_txn_begin_ext(struct ldbminfo *li, back_txnid parent_txn,
back_txn *txn,
{
/* this txn is now our current transaction for current operations
and new parent for any nested transactions created */
- dblayer_push_pvt_txn(&new_txn);
- if (txn) {
- txn->back_txn_txn = new_txn.back_txn_txn;
- }
+ if (use_lock && log_flush_thread) {
+ int txn_id = new_txn.back_txn_txn->id(new_txn.back_txn_txn);
+ PR_Lock(sync_txn_log_flush);
+ txn_in_progress_count++;
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_begin: batchcount: %d, txn_in_progress:
%d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count, txn_id);
+ PR_Unlock(sync_txn_log_flush);
+ }
+ dblayer_push_pvt_txn(&new_txn);
+ if (txn) {
+ txn->back_txn_txn = new_txn.back_txn_txn;
+ }
}
} else
{
@@ -3554,6 +3615,8 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn,
PRBool use_lock)
dblayer_private *priv = NULL;
DB_TXN *db_txn = NULL;
back_txn *cur_txn = NULL;
+ int txn_id = 0;
+ int txn_batch_slot = 0;
PR_ASSERT(NULL != li);
@@ -3576,6 +3639,7 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn,
PRBool use_lock)
priv->dblayer_env &&
priv->dblayer_enable_transactions)
{
+ txn_id = db_txn->id(db_txn);
return_value = TXN_COMMIT(db_txn, 0);
/* if we were given a transaction, and it is the same as the
current transaction in progress, pop it off the stack
@@ -3590,17 +3654,33 @@ int dblayer_txn_commit_ext(struct ldbminfo *li, back_txn *txn,
PRBool use_lock)
}
if ((priv->dblayer_durable_transactions) && use_lock ) {
if(trans_batch_limit > 0) {
- if(trans_batch_count % trans_batch_limit) {
- trans_batch_count++;
- } else {
- LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
- trans_batch_count=1;
- }
+ /* let log_flush thread do the flushing */
+ PR_Lock(sync_txn_log_flush);
+ txn_batch_slot = trans_batch_count++;
+ txn_log_flush_pending[txn_batch_slot] = txn_id;
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (befor notify): batchcount: %d,
txn_in_progress: %d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count,
txn_id);
+ /* the log flush thread will periodically flush the txn log,
+ * but in two cases it should be notified to do it immediately:
+ * - the batch limit is passed
+ * - there is no other outstanding txn
+ */
+ if (trans_batch_count > trans_batch_limit ||
+ trans_batch_count == txn_in_progress_count)
+ PR_NotifyCondVar(sync_txn_log_do_flush);
+ /* we need to wait until the txn has been flushed before continuing
+ * and returning success to the client, nit to vialate durability
+ * PR_WaitCondvar releases and reaquires the lock
+ */
+ while (txn_log_flush_pending[txn_batch_slot] == txn_id)
+ PR_WaitCondVar(sync_txn_log_flush_done, PR_INTERVAL_NO_TIMEOUT);
+ txn_in_progress_count--;
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_commit (before unlock): batchcount: %d,
txn_in_progress: %d, curr_txn %x\n", trans_batch_count, txn_in_progress_count,
txn_id);
+ PR_Unlock(sync_txn_log_flush);
} else if(trans_batch_limit == FLUSH_REMOTEOFF) { /* user remotely turned
batching off */
LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
- }
- }
- if(use_lock) slapi_rwlock_unlock(priv->dblayer_env->dblayer_env_lock);
+ }
+ }
+ if(use_lock) slapi_rwlock_unlock(priv->dblayer_env->dblayer_env_lock);
} else
{
return_value = 0;
@@ -3663,6 +3743,13 @@ int dblayer_txn_abort_ext(struct ldbminfo *li, back_txn *txn,
PRBool use_lock)
priv->dblayer_env &&
priv->dblayer_enable_transactions)
{
+ int txn_id = db_txn->id(db_txn);
+ if (log_flush_thread) {
+ PR_Lock(sync_txn_log_flush);
+ txn_in_progress_count--;
+ PR_Unlock(sync_txn_log_flush);
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "txn_abort : batchcount: %d, txn_in_progress:
%d, curr_txn: %x\n", trans_batch_count, txn_in_progress_count, txn_id);
+ }
return_value = TXN_ABORT(db_txn);
/* if we were given a transaction, and it is the same as the
current transaction in progress, pop it off the stack
@@ -4336,10 +4423,17 @@ static int
dblayer_start_log_flush_thread(dblayer_private *priv)
{
int return_value = 0;
+ int max_threads = config_get_threadnumber();
if ((priv->dblayer_durable_transactions) &&
(priv->dblayer_enable_transactions) && (trans_batch_limit > 0)) {
log_flush_thread=PR_TRUE;
+ /* initialize the synchronization objects for the log_flush and worker threads */
+ sync_txn_log_flush = PR_NewLock();
+ sync_txn_log_flush_done = PR_NewCondVar (sync_txn_log_flush);
+ sync_txn_log_do_flush = PR_NewCondVar (sync_txn_log_flush);
+ txn_log_flush_pending = (int*)slapi_ch_malloc(max_threads*sizeof(int));
+
if (NULL == PR_CreateThread (PR_USER_THREAD,
(VFP) (void *) log_flush_threadmain, priv,
PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
@@ -4365,28 +4459,65 @@ dblayer_start_log_flush_thread(dblayer_private *priv)
static int log_flush_threadmain(void *param)
{
dblayer_private *priv = NULL;
- PRIntervalTime interval;
+ PRIntervalTime interval_wait, interval_flush, interval_def;
+ PRIntervalTime last_flush;
+ int i;
+ int do_flush = 0;
PR_ASSERT(NULL != param);
priv = (dblayer_private *) param;
INCR_THREAD_COUNT(priv);
- interval = PR_MillisecondsToInterval(300);
+ interval_flush = PR_MillisecondsToInterval(trans_batch_txn_min_sleep);
+ interval_wait = PR_MillisecondsToInterval(trans_batch_txn_max_sleep);
+ interval_def = PR_MillisecondsToInterval(300); /*used while no txn or txn batching
*/
+ /* LK this is only needed if online change of
+ * of txn config is supported ???
+ */
while ((!priv->dblayer_stop_threads) && (log_flush_thread))
{
if (priv->dblayer_enable_transactions)
- {
- DB_CHECKPOINT_LOCK(1, priv->dblayer_env->dblayer_env_lock);
- if(trans_batch_limit > 0) {
- if(trans_batch_count > 1) {
- LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
- trans_batch_count=1;
- }
- }
- DB_CHECKPOINT_UNLOCK(1, priv->dblayer_env->dblayer_env_lock);
- }
- DS_Sleep(interval);
+ {
+ if (trans_batch_limit > 0) {
+ /* synchronize flushing thread with workers */
+ PR_Lock(sync_txn_log_flush);
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (in loop): batchcount:
%d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
+ /* if here, do flush the txn logs if any of the following conditions are met
+ * - batch limit exceeded
+ * - no more active transaction, no need to wait
+ * - do_flush indicate that the max waiting interval is exceeded
+ */
+ if(trans_batch_count >= trans_batch_limit || trans_batch_count ==
txn_in_progress_count || do_flush) {
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (working): batchcount:
%d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
+ LOG_FLUSH(priv->dblayer_env->dblayer_DB_ENV,0);
+ for (i=0;i<trans_batch_count;i++)
+ txn_log_flush_pending[i] = 0;
+ trans_batch_count = 0;
+ last_flush = PR_IntervalNow();
+ do_flush = 0;
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (before notify):
batchcount: %d, txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count,
0);
+ PR_NotifyAllCondVar(sync_txn_log_flush_done);
+ }
+ /* wait until flushing conditions are met */
+ while ( trans_batch_count == 0 ||
+ ( trans_batch_count < trans_batch_limit &&
+ trans_batch_count < txn_in_progress_count)) {
+ if (priv->dblayer_stop_threads) break;
+ if (PR_IntervalNow() - last_flush > interval_flush) {
+ do_flush = 1;
+ break;
+ }
+ PR_WaitCondVar(sync_txn_log_do_flush, interval_wait);
+ }
+ PR_Unlock(sync_txn_log_flush);
+ LDAPDebug(LDAP_DEBUG_BACKLDBM, "log_flush_threadmain (wakeup): batchcount: %d,
txn_in_progress: %d\n", trans_batch_count, txn_in_progress_count, 0);
+ } else {
+ DS_Sleep(interval_def);
+ }
+ } else {
+ DS_Sleep(interval_def);
+ }
}
DECR_THREAD_COUNT(priv);
@@ -4580,6 +4711,11 @@ static int
dblayer_start_trickle_thread(struct ldbminfo *li)
{
int return_value = 0;
+ dblayer_private *priv = (dblayer_private*)li->li_dblayer_private;
+
+ if (priv->dblayer_trickle_percentage == 0)
+ return return_value;
+
if (NULL == PR_CreateThread (PR_USER_THREAD,
(VFP) (void *) trickle_threadmain, li,
PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_config.c
b/ldap/servers/slapd/back-ldbm/ldbm_config.c
index 232af54..b72984b 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_config.c
+++ b/ldap/servers/slapd/back-ldbm/ldbm_config.c
@@ -1326,6 +1326,8 @@ static config_info ldbm_config[] = {
{CONFIG_DB_TRANSACTION_LOGGING, CONFIG_TYPE_ONOFF, "on",
&ldbm_config_db_transaction_logging_get, &ldbm_config_db_transaction_logging_set,
0},
{CONFIG_DB_CHECKPOINT_INTERVAL, CONFIG_TYPE_INT, "60",
&ldbm_config_db_checkpoint_interval_get, &ldbm_config_db_checkpoint_interval_set,
CONFIG_FLAG_ALWAYS_SHOW|CONFIG_FLAG_ALLOW_RUNNING_CHANGE},
{CONFIG_DB_TRANSACTION_BATCH, CONFIG_TYPE_INT, "0",
&dblayer_get_batch_transactions, &dblayer_set_batch_transactions,
CONFIG_FLAG_ALWAYS_SHOW|CONFIG_FLAG_ALLOW_RUNNING_CHANGE},
+ {CONFIG_DB_TRANSACTION_BATCH_MIN_SLEEP, CONFIG_TYPE_INT, "50",
&dblayer_get_batch_txn_min_sleep, &dblayer_set_batch_txn_min_sleep,
CONFIG_FLAG_ALWAYS_SHOW|CONFIG_FLAG_ALLOW_RUNNING_CHANGE},
+ {CONFIG_DB_TRANSACTION_BATCH_MAX_SLEEP, CONFIG_TYPE_INT, "50",
&dblayer_get_batch_txn_max_sleep, &dblayer_set_batch_txn_max_sleep,
CONFIG_FLAG_ALWAYS_SHOW|CONFIG_FLAG_ALLOW_RUNNING_CHANGE},
{CONFIG_DB_LOGBUF_SIZE, CONFIG_TYPE_SIZE_T, "0",
&ldbm_config_db_logbuf_size_get, &ldbm_config_db_logbuf_size_set,
CONFIG_FLAG_ALWAYS_SHOW},
{CONFIG_DB_PAGE_SIZE, CONFIG_TYPE_SIZE_T, "0",
&ldbm_config_db_page_size_get, &ldbm_config_db_page_size_set, 0},
{CONFIG_DB_INDEX_PAGE_SIZE, CONFIG_TYPE_SIZE_T, "0",
&ldbm_config_db_index_page_size_get, &ldbm_config_db_index_page_size_set, 0},
diff --git a/ldap/servers/slapd/back-ldbm/ldbm_config.h
b/ldap/servers/slapd/back-ldbm/ldbm_config.h
index a5830e3..204f64d 100644
--- a/ldap/servers/slapd/back-ldbm/ldbm_config.h
+++ b/ldap/servers/slapd/back-ldbm/ldbm_config.h
@@ -111,6 +111,8 @@ struct config_info {
#define CONFIG_DB_TRANSACTION_LOGGING "nsslapd-db-transaction-logging"
#define CONFIG_DB_CHECKPOINT_INTERVAL "nsslapd-db-checkpoint-interval"
#define CONFIG_DB_TRANSACTION_BATCH "nsslapd-db-transaction-batch-val"
+#define CONFIG_DB_TRANSACTION_BATCH_MIN_SLEEP
"nsslapd-db-transaction-batch-min-wait"
+#define CONFIG_DB_TRANSACTION_BATCH_MAX_SLEEP
"nsslapd-db-transaction-batch-max-wait"
#define CONFIG_DB_LOGBUF_SIZE "nsslapd-db-logbuf-size"
#define CONFIG_DB_PAGE_SIZE "nsslapd-db-page-size"
#define CONFIG_DB_INDEX_PAGE_SIZE "nsslapd-db-index-page-size" /* With the new
diff --git a/ldap/servers/slapd/back-ldbm/proto-back-ldbm.h
b/ldap/servers/slapd/back-ldbm/proto-back-ldbm.h
index e87c900..a326185 100644
--- a/ldap/servers/slapd/back-ldbm/proto-back-ldbm.h
+++ b/ldap/servers/slapd/back-ldbm/proto-back-ldbm.h
@@ -163,7 +163,11 @@ PRInt64 db_atol(char *str, int *err);
PRInt64 db_atoi(char *str, int *err);
unsigned long db_strtoul(const char *str, int *err);
int dblayer_set_batch_transactions(void *arg, void *value, char *errorbuf, int phase, int
apply);
+int dblayer_set_batch_txn_min_sleep(void *arg, void *value, char *errorbuf, int phase,
int apply);
+int dblayer_set_batch_txn_max_sleep(void *arg, void *value, char *errorbuf, int phase,
int apply);
void *dblayer_get_batch_transactions(void *arg);
+void *dblayer_get_batch_txn_min_sleep(void *arg);
+void *dblayer_get_batch_txn_max_sleep(void *arg);
int dblayer_in_import(ldbm_instance *inst);
int dblayer_update_db_ext(ldbm_instance *inst, char *oldext, char *newext);