ldap/servers/plugins/replication/cl5_api.c | 104 ++++
ldap/servers/plugins/replication/cl5_api.h | 4
ldap/servers/plugins/replication/repl5.h | 18
ldap/servers/plugins/replication/repl5_agmt.c | 9
ldap/servers/plugins/replication/repl5_init.c | 63 ++
ldap/servers/plugins/replication/repl5_plugins.c | 10
ldap/servers/plugins/replication/repl5_replica_config.c | 344 +++++++++++++++-
ldap/servers/plugins/replication/repl5_ruv.c | 33 +
ldap/servers/plugins/replication/repl_extop.c | 341 +++++++++++++++
9 files changed, 897 insertions(+), 29 deletions(-)
New commits:
commit 0f50544b9567907edd0ba645951d7cd325354107
Author: root <root(a)localhost.localdomain>
Date: Mon Apr 23 13:36:04 2012 -0400
Ticket #337 - RFE - Improve CLEANRUV functionality
Bug Description: Previously the steps to remove a replica and its RUV was
problematic.
I created two new "tasks" to take care of the entire
replication environment.
Fix Description:
[1] The new task "CLEANALLRUV<rid>" - run it once on any master
This marks the rid as invalid. Used to reject updates to the changelog, and the
database RUV
It then sends a "CLEANRUV" extended operation to each agreement.
Then it cleans its own RUV.
The CLEANRUV extended op then triggers that replica to send the the same CLEANRUV
extop to its replicas, then it cleans its own RID. Basically
this operation cascades through the entire replication environment.
[2] The "RELEASERUV<rid>" task - run it once on any master
Once the RUV's have been cleaned on all the replicas, you need to
"release" the rid so that it can be reused. This operation also cascades thro
ugh the entire replication environment. This also triggers changelog trimming.
For all of this to work correctly, there is a list of steps that needs to be followed.
This procedure is attached to the ticket.
https://fedorahosted.org/389/ticket/337
reviewed by: ?
diff --git a/ldap/servers/plugins/replication/cl5_api.c
b/ldap/servers/plugins/replication/cl5_api.c
index 2b57f91..03d50ca 100644
--- a/ldap/servers/plugins/replication/cl5_api.c
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -355,6 +355,7 @@ static int _cl5WriteRUV (CL5DBFile *file, PRBool purge);
static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge);
static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv);
+void trigger_cl_trimming_thread();
/* bakup/recovery, import/export */
static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op,
@@ -3477,6 +3478,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim)
count = 0;
txnid = NULL;
abort = PR_FALSE;
+ ReplicaId rid;
/* DB txn lock accessed pages until the end of the transaction. */
@@ -3497,13 +3499,14 @@ static void _cl5TrimFile (Object *obj, long *numToTrim)
* This change can be trimmed if it exceeds purge
* parameters and has been seen by all consumers.
*/
+ rid = csn_get_replicaid (op.csn);
if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) &&
ruv_covers_csn_strict (ruv, op.csn) )
- {
+ {
rc = _cl5CurrentDeleteEntry (it);
- if ( rc == CL5_SUCCESS )
+ if ( rc == CL5_SUCCESS && !is_released_rid(rid))
{
- /* update purge vector */
+ /* update purge vector, unless this is a released rid */
rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE);
}
if ( rc == CL5_SUCCESS)
@@ -3529,8 +3532,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim)
* the trim forever.
*/
CSN *maxcsn = NULL;
- ReplicaId rid;
-
rid = csn_get_replicaid (op.csn);
ruv_get_largest_csn_for_replica (ruv, rid, &maxcsn);
if ( csn_compare (op.csn, maxcsn) != 0 )
@@ -3620,10 +3621,10 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
return ( *numToTrim > 0 );
}
-
+
if (s_cl5Desc.dbTrim.maxEntries > 0 &&
(*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
- return PR_TRUE;
+ return PR_TRUE;
if (time)
return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
@@ -3805,6 +3806,7 @@ static int _cl5ConstructRUV (const char *replGen, Object *obj,
PRBool purge)
void *iterator = NULL;
slapi_operation_parameters op = {0};
CL5DBFile *file;
+ ReplicaId rid;
PR_ASSERT (replGen && obj);
@@ -3828,6 +3830,15 @@ static int _cl5ConstructRUV (const char *replGen, Object *obj,
PRBool purge)
rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL);
while (rc == CL5_SUCCESS)
{
+ rid = csn_get_replicaid (op.csn);
+ if(is_cleaned_rid(rid)){
+ /* skip this entry as the rid is invalid */
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV:
"
+ "skipping entry because its csn contains a cleaned rid(%d)\n",
rid);
+ cl5_operation_parameters_done (&op);
+ rc = _cl5GetNextEntry (&entry, iterator);
+ continue;
+ }
if (purge)
rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn);
else
@@ -3876,28 +3887,32 @@ static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool
newReplica, PRBool purge
file = (CL5DBFile*)object_get_data (obj);
- /* if purge is TRUE, file->purgeRUV must be set;
- if purge is FALSE, maxRUV must be set */
+ /*
+ * if purge is TRUE, file->purgeRUV must be set;
+ * if purge is FALSE, maxRUV must be set
+ */
PR_ASSERT (file && ((purge && file->purgeRUV) || (!purge
&& file->maxRUV)));
+ rid = csn_get_replicaid(csn);
/* update vector only if this replica is not yet part of RUV */
if (purge && newReplica)
{
- rid = csn_get_replicaid(csn);
if (ruv_contains_replica (file->purgeRUV, rid))
return CL5_SUCCESS;
- else
+ else if(is_cleaned_rid(rid))
{
- /* if the replica is not part of the purgeRUV yet, add it */
- ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl());
+ /* if the replica is not part of the purgeRUV yet, add it unless it's
from a cleaned rid */
+ ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl());
}
}
else
{
if (purge)
rc = ruv_set_csns(file->purgeRUV, csn, NULL);
- else
- rc = ruv_set_csns(file->maxRUV, csn, NULL);
+ else if(is_cleaned_rid(rid)){
+ /* don't update maxRuv is if rid is cleaned */
+ rc = ruv_set_csns(file->maxRUV, csn, NULL);
+ }
}
if (rc != RUV_SUCCESS)
@@ -4502,9 +4517,17 @@ static int _cl5WriteOperationTxn(const char *replName, const char
*replGen,
CL5Entry entry;
CL5DBFile *file = NULL;
Object *file_obj = NULL;
+ ReplicaId rid = csn_get_replicaid (op->csn);
DB_TXN *txnid = NULL;
DB_TXN *parent_txnid = (DB_TXN *)txn;
+ /*
+ * If the op csn contains the cleaned rid, don't write it
+ */
+ if(is_cleaned_rid(rid)){
+ return CL5_SUCCESS;
+ }
+
rc = _cl5GetDBFileByReplicaName (replName, replGen, &file_obj);
if (rc == CL5_NOTFOUND)
{
@@ -4770,7 +4793,7 @@ static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void
**iterator, DB_
rc, db_strerror(rc));
rc = CL5_DB_ERROR;
-done:;
+done:
/* error occured */
/* We didn't success in assigning this cursor to the iterator,
* so we need to free the cursor here */
@@ -6512,3 +6535,52 @@ bail:
changelog5_config_done(&config);
return rc;
}
+
+/*
+ * Clean the in memory RUV, at shutdown we will write the update to the db
+ */
+void
+cl5CleanRUV(ReplicaId rid){
+ CL5DBFile *file;
+ Object *obj;
+
+ obj = objset_first_obj(s_cl5Desc.dbFiles);
+ while (obj){
+ file = (CL5DBFile *)object_get_data(obj);
+ ruv_delete_replica(file->purgeRUV, rid);
+ ruv_delete_replica(file->maxRUV, rid);
+ obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
+ }
+
+ if (obj)
+ object_release (obj);
+}
+
+void trigger_cl_trimming(){
+ PRThread *trim_tid = NULL;
+ ReplicaId rid = get_released_rid();
+
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_trimming: rid
(%d)\n",(int)rid);
+ trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_trimming_thread,
+ NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+ PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
+ if (NULL == trim_tid){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "trigger_cl_trimming: failed to create trimming "
+ "thread; NSPR error - %d\n", PR_GetError ());
+ } else {
+ /* need a little time for the thread to get started */
+ DS_Sleep(PR_SecondsToInterval(1));
+ }
+}
+
+void
+trigger_cl_trimming_thread(){
+ /* make sure we have a change log, and we aren't closing it */
+ if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
+ return;
+ }
+ _cl5AddThread();
+ _cl5DoTrimming();
+ _cl5RemoveThread();
+}
diff --git a/ldap/servers/plugins/replication/cl5_api.h
b/ldap/servers/plugins/replication/cl5_api.h
index 6f2552f..b9c3dd8 100644
--- a/ldap/servers/plugins/replication/cl5_api.h
+++ b/ldap/servers/plugins/replication/cl5_api.h
@@ -487,4 +487,8 @@ int cl5WriteRUV();
Return: TRUE
*/
int cl5DeleteRUV();
+void cl5CleanRUV(ReplicaId rid);
+void cl5NotifyCleanup(int rid);
+void trigger_cl_trimming();
+
#endif
diff --git a/ldap/servers/plugins/replication/repl5.h
b/ldap/servers/plugins/replication/repl5.h
index ac2cd88..e315150 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -94,6 +94,9 @@
* new set of start and response extops. */
#define REPL_START_NSDS90_REPLICATION_REQUEST_OID "2.16.840.1.113730.3.5.12"
#define REPL_NSDS90_REPLICATION_RESPONSE_OID "2.16.840.1.113730.3.5.13"
+/* cleanruv/releaseruv extended ops */
+#define REPL_CLEANRUV_OID "2.16.840.1.113730.3.6.5"
+#define REPL_RELEASERUV_OID "2.16.840.1.113730.3.6.6"
/* DS 5.0 replication protocol error codes */
@@ -218,6 +221,8 @@ char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
/* In repl_extop.c */
int multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb);
int multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb);
+int multimaster_extop_cleanruv(Slapi_PBlock *pb);
+int multimaster_extop_releaseruv(Slapi_PBlock *pb);
int extop_noop(Slapi_PBlock *pb);
struct berval *NSDS50StartReplicationRequest_new(const char *protocol_oid,
const char *repl_root, char **extra_referrals, CSN *csn);
@@ -345,8 +350,8 @@ char **agmt_get_fractional_attrs_total(const Repl_Agmt *ra);
char **agmt_validate_replicated_attributes(Repl_Agmt *ra, int total);
void* agmt_get_priv (const Repl_Agmt *agmt);
void agmt_set_priv (Repl_Agmt *agmt, void* priv);
-
int get_agmt_agreement_type ( Repl_Agmt *agmt);
+void* agmt_get_connection( Repl_Agmt *ra);
int agmt_has_protocol(Repl_Agmt *agmt);
typedef struct replica Replica;
@@ -579,6 +584,17 @@ void multimaster_be_state_change (void *handle, char *be_name, int
old_be_state,
int replica_config_init();
void replica_config_destroy ();
int get_replica_type(Replica *r);
+int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid);
+void set_cleaned_rid(ReplicaId rid);
+void delete_cleaned_rid();
+int is_cleaned_rid(ReplicaId rid);
+int get_released_rid();
+void set_released_rid(int rid);
+int is_released_rid(int rid);
+int is_already_released_rid();
+void delete_released_rid();
+
+#define ALREADY_RELEASED -1
/* replutil.c */
LDAPControl* create_managedsait_control ();
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c
b/ldap/servers/plugins/replication/repl5_agmt.c
index 1d9affa..8714021 100644
--- a/ldap/servers/plugins/replication/repl5_agmt.c
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -2425,6 +2425,15 @@ ReplicaId agmt_get_consumerRID(Repl_Agmt *ra)
return ra->consumerRID;
}
+void* agmt_get_connection(Repl_Agmt *ra)
+{
+ if(ra->protocol){
+ return (void *)prot_get_connection(ra->protocol);
+ } else {
+ return NULL;
+ }
+}
+
int
agmt_has_protocol(Repl_Agmt *agmt)
{
diff --git a/ldap/servers/plugins/replication/repl5_init.c
b/ldap/servers/plugins/replication/repl5_init.c
index 4e2464c..a878b19 100644
--- a/ldap/servers/plugins/replication/repl5_init.c
+++ b/ldap/servers/plugins/replication/repl5_init.c
@@ -118,6 +118,22 @@ static char *response_name_list[] = {
NSDS_REPL_NAME_PREFIX " Response",
NULL
};
+static char *cleanruv_oid_list[] = {
+ REPL_CLEANRUV_OID,
+ NULL
+};
+static char *cleanruv_name_list[] = {
+ NSDS_REPL_NAME_PREFIX " Cleanruv",
+ NULL
+};
+static char *releaseruv_oid_list[] = {
+ REPL_RELEASERUV_OID,
+ NULL
+};
+static char *releaseruv_name_list[] = {
+ NSDS_REPL_NAME_PREFIX " Releaseruv",
+ NULL
+};
/* List of plugin identities for every plugin registered. Plugin identity
is passed by the server in the plugin init function and must be supplied
@@ -434,6 +450,51 @@ multimaster_response_extop_init( Slapi_PBlock *pb )
return rc;
}
+int
+multimaster_cleanruv_extop_init( Slapi_PBlock *pb )
+{
+ int rc= 0; /* OK */
+ void *identity = NULL;
+
+ /* get plugin identity and store it to pass to internal operations */
+ slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity);
+ PR_ASSERT (identity);
+
+ if (slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, SLAPI_PLUGIN_VERSION_01 ) != 0 ||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, (void *)&multimasterextopdesc ) !=
0 ||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_OIDLIST, (void *)cleanruv_oid_list ) != 0
||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_NAMELIST, (void *)cleanruv_name_list ) != 0
||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_FN, (void *)multimaster_extop_cleanruv ))
+ {
+ slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name,
"multimaster_cleanruv_extop_init failed\n" );
+ rc= -1;
+ }
+
+ return rc;
+}
+
+int
+multimaster_releaseruv_extop_init( Slapi_PBlock *pb )
+{
+ int rc= 0; /* OK */
+ void *identity = NULL;
+
+ /* get plugin identity and store it to pass to internal operations */
+ slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity);
+ PR_ASSERT (identity);
+
+ if (slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, SLAPI_PLUGIN_VERSION_01 ) != 0 ||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, (void *)&multimasterextopdesc ) !=
0 ||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_OIDLIST, (void *)releaseruv_oid_list ) != 0
||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_NAMELIST, (void *)releaseruv_name_list ) != 0
||
+ slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_FN, (void *)multimaster_extop_releaseruv ))
+ {
+ slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name,
"multimaster_releaseruv_extop_init failed\n" );
+ rc= -1;
+ }
+
+ return rc;
+}
static PRBool
check_for_ldif_dump(Slapi_PBlock *pb)
@@ -618,6 +679,8 @@ int replication_multimaster_plugin_init(Slapi_PBlock *pb)
rc= slapi_register_plugin("extendedop", 1 /* Enabled */,
"multimaster_end_extop_init", multimaster_end_extop_init, "Multimaster
replication end extended operation plugin", NULL, identity);
rc= slapi_register_plugin("extendedop", 1 /* Enabled */,
"multimaster_total_extop_init", multimaster_total_extop_init, "Multimaster
replication total update extended operation plugin", NULL, identity);
rc= slapi_register_plugin("extendedop", 1 /* Enabled */,
"multimaster_response_extop_init", multimaster_response_extop_init,
"Multimaster replication extended response plugin", NULL, identity);
+ rc= slapi_register_plugin("extendedop", 1 /* Enabled */,
"multimaster_cleanruv_extop_init", multimaster_cleanruv_extop_init,
"Multimaster replication cleanruv extended operation plugin", NULL, identity);
+ rc= slapi_register_plugin("extendedop", 1 /* Enabled */,
"multimaster_releaseruv_extop_init", multimaster_releaseruv_extop_init,
"Multimaster replication releaserid extended response plugin", NULL, identity);
if (0 == rc)
{
multimaster_initialised = 1;
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c
b/ldap/servers/plugins/replication/repl5_plugins.c
index c806c08..dfbb80e 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -1005,6 +1005,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
r = (Replica*)object_get_data (repl_obj);
PR_ASSERT (r);
+ /*
+ * In case we had to run cleanruv, we don't want to continue to write
+ * updates to the changelog/database ruv from that replica(rid).
+ */
+ if( is_cleaned_rid(replica_get_rid(r))){
+ /* this RID has been cleaned, just goto done */
+ goto done;
+ }
+
if (replica_is_flag_set (r, REPLICA_LOG_CHANGES) &&
(cl5GetState () == CL5_STATE_OPEN))
{
@@ -1111,6 +1120,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
update_ruv_component(r, opcsn, pb);
}
+done:
object_release (repl_obj);
return return_value;
}
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c
b/ldap/servers/plugins/replication/repl5_replica_config.c
index 94f4179..78a948b 100644
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
@@ -56,9 +56,15 @@
#define LDIF2CL_TASK "LDIF2CL"
#define CLEANRUV "CLEANRUV"
#define CLEANRUVLEN 8
+#define CLEANALLRUV "CLEANALLRUV"
+#define CLEANALLRUVLEN 11
+#define RELEASERUV "RELEASERUV"
+#define RELEASERUVLEN 10
#define REPLICA_RDN "cn=replica"
int slapi_log_urp = SLAPI_LOG_REPL;
+static ReplicaId cleaned_rid = 0;
+static int released_rid = 0;
/* Forward Declartions */
static int replica_config_add (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter,
int *returncode, char *returntext, void *arg);
@@ -74,6 +80,9 @@ static int replica_execute_task (Object *r, const char *task_name, char
*returnt
static int replica_execute_cl2ldif_task (Object *r, char *returntext);
static int replica_execute_ldif2cl_task (Object *r, char *returntext);
static int replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext);
+static int replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char
*returntext);
+static int replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext);
+static struct berval *create_ruv_payload(char *value);
static int replica_cleanup_task (Object *r, const char *task_name, char *returntext, int
apply_mods);
static int replica_task_done(Replica *replica);
@@ -825,10 +834,8 @@ static int replica_execute_task (Object *r, const char *task_name,
char *returnt
{
int temprid = atoi(&(task_name[CLEANRUVLEN]));
if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
- PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE,
- "Invalid replica id for task - %s", task_name);
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
- "replica_execute_task: %s\n", returntext);
+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for
task - %s", temprid, task_name);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_execute_task:
%s\n", returntext);
return LDAP_OPERATIONS_ERROR;
}
if (apply_mods)
@@ -838,6 +845,36 @@ static int replica_execute_task (Object *r, const char *task_name,
char *returnt
else
return LDAP_SUCCESS;
}
+ else if (strncasecmp (task_name, CLEANALLRUV, CLEANALLRUVLEN) == 0)
+ {
+ int temprid = atoi(&(task_name[CLEANALLRUVLEN]));
+ if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for
task - (%s)", temprid, task_name);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_execute_task:
%s\n", returntext);
+ return LDAP_OPERATIONS_ERROR;
+ }
+ if (apply_mods)
+ {
+ return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, returntext);
+ }
+ else
+ return LDAP_SUCCESS;
+ }
+ else if (strncasecmp (task_name, RELEASERUV, RELEASERUVLEN) == 0)
+ {
+ int temprid = atoi(&(task_name[RELEASERUVLEN]));
+ if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
+ PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id for task -
%s", task_name);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,"replica_execute_task:
%s\n", returntext);
+ return LDAP_OPERATIONS_ERROR;
+ }
+ if (apply_mods)
+ {
+ return replica_execute_release_ruv_task(r, (ReplicaId)temprid, returntext);
+ }
+ else
+ return LDAP_SUCCESS;
+ }
else
{
PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "unsupported replica task
- %s", task_name);
@@ -1099,16 +1136,24 @@ _replica_config_get_mtnode_ext (const Slapi_Entry *e)
return ext;
}
+int
+replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid)
+{
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: calling
clean_ruv_ext\n");
+ return replica_execute_cleanruv_task(r, rid, NULL);
+}
+
static int
-replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext)
+replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not used
*/)
{
int rc = 0;
Object *RUVObj;
RUV *local_ruv = NULL;
- Replica *replica = (Replica*)object_get_data (r);
+ Replica *replica = (Replica*)object_get_data (r);
- PR_ASSERT (replica);
+ PR_ASSERT (replica);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_task: cleaning rid
(%d)...\n",(int)rid);
RUVObj = replica_get_ruv(replica);
PR_ASSERT(RUVObj);
local_ruv = (RUV*)object_get_data (RUVObj);
@@ -1128,10 +1173,295 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char
*returntext)
/* Update Mapping Tree to reflect RUV changes */
consumer5_set_mapping_tree_state_for_replica(replica, NULL);
+ /*
+ * Clean the changelog RUV's, and set the rids
+ */
+ cl5CleanRUV(rid);
+ set_cleaned_rid(rid);
+ delete_released_rid();
+
if (rc != RUV_SUCCESS){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task
failed(%d)\n",rc);
return LDAP_OPERATIONS_ERROR;
}
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_task: finished
successfully\n");
return LDAP_SUCCESS;
}
+static int
+replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext)
+{
+ Repl_Connection *conn;
+ Replica *replica = (Replica*)object_get_data (r);
+ Object *agmt_obj;
+ Repl_Agmt *agmt;
+ ConnResult crc;
+ const Slapi_DN *dn = NULL;
+ struct berval *payload = NULL;
+ char *ridstr = NULL;
+ int send_msgid = 0;
+ int rc = 0;
+
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: cleaning rid
(%d)...\n",(int)rid);
+
+ /*
+ * Create payload
+ */
+ ridstr = slapi_ch_smprintf("%d:%s", rid,
slapi_sdn_get_dn(replica_get_root(replica)));
+ payload = create_ruv_payload(ridstr);
+ if(payload == NULL){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to
create ext op payload, aborting task\n");
+ goto done;
+ }
+
+ agmt_obj = agmtlist_get_first_agreement_for_replica (replica);
+ while (agmt_obj)
+ {
+ agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+ dn = agmt_get_dn_byref(agmt);
+ conn = (Repl_Connection *)agmt_get_connection(agmt);
+ if(conn == NULL){
+ /* no connection for this agreement, and move on */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: the replica
(%s), is "
+ "missing the connection. This replica will not be cleaned.\n",
slapi_sdn_get_dn(dn));
+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+ continue;
+ }
+ crc = conn_connect(conn);
+ if (CONN_OPERATION_FAILED == crc ){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to
connect "
+ "to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_TRANSIENT_ERROR);
+ } else if (CONN_SSL_NOT_ENABLED == crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to
acquire "
+ "repl agmt connection (%s), errror %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_FATAL_ERROR);
+ } else {
+ conn_cancel_linger(conn);
+ crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, payload, NULL,
&send_msgid);
+ if (CONN_OPERATION_SUCCESS != crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to
send "
+ "cleanruv extended op to repl agmt (%s), error %d\n",
slapi_sdn_get_dn(dn), crc);
+ } else {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task:
successfully sent "
+ "cleanruv extended op to (%s)\n",slapi_sdn_get_dn(dn));
+ }
+ conn_start_linger(conn);
+ }
+ if(crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: replica
(%s) has not "
+ "been cleaned. You will need to rerun the CLEANALLRUV task on this
replica.\n", slapi_sdn_get_dn(dn));
+ rc = crc;
+ }
+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+ }
+
+done:
+
+ if(payload)
+ ber_bvfree(payload);
+
+ slapi_ch_free_string(&ridstr);
+
+ /*
+ * Now run the cleanruv task
+ */
+ replica_execute_cleanruv_task (r, rid, returntext);
+
+ if(rc == 0){
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: operation
successful\n");
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: operation
failed (%d)\n",rc);
+ }
+
+ return rc;
+}
+
+static int
+replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext)
+{
+ Repl_Connection *conn;
+ Replica *replica = (Replica*)object_get_data (r);
+ Object *agmt_obj;
+ Repl_Agmt *agmt;
+ ConnResult crc;
+ const Slapi_DN *dn = NULL;
+ struct berval *payload = NULL;
+ char *ridstr = NULL;
+ int send_msgid = 0;
+ int rc = 0;
+
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: releasing rid
(%d)...\n", rid);
+
+ /*
+ * Set the released rid, and trigger cl trimmming
+ */
+ set_released_rid((int)rid);
+ trigger_cl_trimming();
+ /*
+ * Create payload
+ */
+ ridstr = slapi_ch_smprintf("%d:%s", rid,
slapi_sdn_get_dn(replica_get_root(replica)));
+ payload = create_ruv_payload(ridstr);
+ if(payload == NULL){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to
create ext op payload, aborting op\n");
+ rc = -1;
+ goto done;
+ }
+
+ agmt_obj = agmtlist_get_first_agreement_for_replica (replica);
+ while (agmt_obj)
+ {
+ agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+ dn = agmt_get_dn_byref(agmt);
+ conn = (Repl_Connection *)agmt_get_connection(agmt);
+ if(conn == NULL){
+ /* no connection for this agreement, log error, and move on */
+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+ continue;
+ }
+ crc = conn_connect(conn);
+ if (CONN_OPERATION_FAILED == crc ){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to
connect "
+ "to repl agmt (%s), error %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_TRANSIENT_ERROR);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else if (CONN_SSL_NOT_ENABLED == crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to
acquire "
+ "repl agmt (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else {
+ conn_cancel_linger(conn);
+ crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, payload, NULL,
&send_msgid);
+ if (CONN_OPERATION_SUCCESS != crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to
send "
+ "releaseruv extended op to repl agmt (%s), error %d\n",
slapi_sdn_get_dn(dn), crc);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: successfully
sent "
+ "extended op to (%s)\n",slapi_sdn_get_dn(dn));
+ }
+ conn_start_linger(conn);
+ }
+ if(crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: replica (%s)
has not "
+ "been cleaned. You will need to rerun the RELEASERUV task on this
replica\n",
+ slapi_sdn_get_dn(dn));
+ rc = crc;
+ }
+ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+ }
+
+done:
+ /*
+ * reset the released/clean rid
+ */
+ if(rc == 0){
+ set_released_rid(ALREADY_RELEASED);
+ delete_cleaned_rid();
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: Successfully
released rid (%d)\n", rid);
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: Failed to
release rid (%d), error (%d)\n", rid, rc);
+ }
+
+ if(payload)
+ ber_bvfree(payload);
+
+ slapi_ch_free_string(&ridstr);
+
+ return rc;
+}
+
+static struct berval *
+create_ruv_payload(char *value){
+ struct berval *req_data = NULL;
+ BerElement *tmp_bere = NULL;
+ if ((tmp_bere = der_alloc()) == NULL){
+ goto error;
+ }
+ if (ber_printf(tmp_bere, "{s", value) == -1){
+ goto error;
+ }
+
+ if (ber_printf(tmp_bere, "}") == -1){
+ goto error;
+ }
+
+ if (ber_flatten(tmp_bere, &req_data) == -1){
+ goto error;
+ }
+
+ goto done;
+
+error:
+ if (NULL != req_data){
+ ber_bvfree(req_data);
+ req_data = NULL;
+ }
+
+done:
+ if (NULL != tmp_bere){
+ ber_free(tmp_bere, 1);
+ tmp_bere = NULL;
+ }
+
+ return req_data;
+}
+
+int
+is_cleaned_rid(ReplicaId rid)
+{
+ if(rid == cleaned_rid){
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+void
+set_cleaned_rid( ReplicaId rid )
+{
+ cleaned_rid = rid;
+}
+
+void
+delete_cleaned_rid()
+{
+ cleaned_rid = 0;
+}
+
+int
+get_released_rid()
+{
+ return released_rid;
+}
+
+int
+is_released_rid(int rid)
+{
+ if(rid == released_rid){
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+int
+is_already_released_rid()
+{
+ if(released_rid == ALREADY_RELEASED){
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+void
+set_released_rid( int rid )
+{
+ released_rid = rid;
+}
+
+void
+delete_released_rid()
+{
+ released_rid = 0;
+}
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c
b/ldap/servers/plugins/replication/repl5_ruv.c
index 9a80fcc..9ad5c5a 100644
--- a/ldap/servers/plugins/replication/repl5_ruv.c
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
@@ -863,10 +863,27 @@ ruv_covers_csn_internal(const RUV *ruv, const CSN *csn, PRBool
strict)
{
rid = csn_get_replicaid(csn);
replica = ruvGetReplica (ruv, rid);
+ if((is_released_rid(rid)) || (replica == NULL && is_already_released_rid()) ){
+ /* this is a released rid, so return true */
+ return PR_TRUE;
+ }
if (replica == NULL)
{
- slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn: replica for id
%d not found\n", rid);
- return_value = PR_FALSE;
+ /*
+ * We don't know anything about this replica change in the cl, mark it to be
zapped.
+ * This could of been a previously cleaned ruv, but the server was restarted before
+ * the change could be trimmed.
+ *
+ * Only the change log trimming calls this function with "strict" set. So
we'll return success
+ * if strict is set.
+ */
+ if(strict){
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn: replica for
id %d not found.\n", rid);
+ return_value = PR_TRUE;
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "ruv_covers_csn: replica for
id %d not found.\n", rid);
+ return_value = PR_FALSE;
+ }
}
else
{
@@ -1403,12 +1420,20 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
RUVElement* replica;
char csn_str[CSN_STRSIZE];
int rc = RUV_SUCCESS;
+ int rid = csn_get_replicaid (csn);
PR_ASSERT (ruv && csn);
/* locate ruvElement */
slapi_rwlock_wrlock (ruv->lock);
- replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
+
+ if(is_cleaned_rid(rid)){
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_add_csn_inprogress:
invalid replica ID"
+ "(%d), aborting update\n", rid);
+ /* return success because we want to consume the update, but not perform it */
+ goto done;
+ }
+ replica = ruvGetReplica (ruv, rid);
if (replica == NULL)
{
replica = ruvAddReplicaNoCSN (ruv, csn_get_replicaid (csn), NULL/*purl*/);
@@ -1416,7 +1441,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
{
if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
"ruv_add_csn_inprogress: failed to add replica"
- " that created csn %s\n", csn_as_string (csn,
PR_FALSE, csn_str));
+ " that created csn %s\n", csn_as_string (csn, PR_FALSE,
csn_str));
}
rc = RUV_MEMORY_ERROR;
goto done;
diff --git a/ldap/servers/plugins/replication/repl_extop.c
b/ldap/servers/plugins/replication/repl_extop.c
index 2ff3627..7e2ebbd 100644
--- a/ldap/servers/plugins/replication/repl_extop.c
+++ b/ldap/servers/plugins/replication/repl_extop.c
@@ -71,6 +71,7 @@
*
*/
static int check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv);
+static multimaster_mtnode_extension *replica_config_get_mtnode_by_dn(const char *dn);
static int
encode_ruv (BerElement *ber, const RUV *ruv)
@@ -1260,7 +1261,6 @@ multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb)
{
/* The ruv from the supplier may have changed. Report the change on the
consumer side */
-
replica_update_ruv_consumer(r, connext->supplier_ruv);
}
@@ -1315,6 +1315,345 @@ free_and_return:
}
/*
+ * Return the mtnode extension of the dn
+ */
+static multimaster_mtnode_extension *
+replica_config_get_mtnode_by_dn(const char *dn)
+{
+ Slapi_DN *sdn;
+ mapping_tree_node *mtnode;
+ multimaster_mtnode_extension *ext = NULL;
+
+ sdn = slapi_sdn_new_dn_byval(dn);
+ mtnode = slapi_get_mapping_tree_node_by_dn (sdn);
+ if (mtnode) {
+ /* check if the replica object already exists in the subtree */
+ ext = (multimaster_mtnode_extension *)repl_con_get_ext (REPL_CON_EXT_MTNODE, mtnode);
+ }
+ slapi_sdn_free (&sdn);
+
+ return ext;
+}
+
+/*
+ * Decode the ber element passed to us by the cleanAllRUV task
+ */
+static int
+decode_cleanruv_payload(struct berval *extop_value, char **payload)
+{
+ BerElement *tmp_bere = NULL;
+ int rc = 0;
+
+ if ((tmp_bere = ber_init(extop_value)) == NULL){
+ rc = -1;
+ goto free_and_return;
+ }
+ if (ber_scanf(tmp_bere, "{") == LBER_ERROR){
+ rc = -1;
+ goto free_and_return;
+ }
+ if (ber_get_stringa(tmp_bere, payload) == LBER_DEFAULT){
+ rc = -1;
+ goto free_and_return;
+ }
+
+ if (ber_scanf(tmp_bere, "}") == LBER_ERROR){
+ rc = -1;
+ goto free_and_return;
+ }
+
+free_and_return:
+ if (-1 == rc){
+ slapi_ch_free_string(payload);
+ }
+ if (NULL != tmp_bere){
+ ber_free(tmp_bere, 1);
+ tmp_bere = NULL;
+ }
+ return rc;
+}
+
+/*
+ * Process the REPL_CLEANRUV_OID extended operation.
+ *
+ * The payload consists of the replica ID, and the repl root dn. Since this is
+ * basically a replication operation, it could of originated here and bounced
+ * back from another master. So check the rid against the "cleaned_rid". If
+ * it's a match, then we were already here, and we can just return success.
+ *
+ * Otherwise, we the set the cleaned_rid from the payload, fire off extended ops
+ * to all the replica agreements on this replica. Then perform the actual
+ * cleanruv_task on this replica.
+ */
+int
+multimaster_extop_cleanruv(Slapi_PBlock *pb){
+ multimaster_mtnode_extension *mtnode_ext;
+ Repl_Connection *conn;
+ const Slapi_DN *dn;
+ Replica *r = NULL;
+ Object *agmt_obj;
+ Repl_Agmt *agmt;
+ ConnResult crc;
+ struct berval *extop_value;
+ char *extop_oid;
+ char *repl_root;
+ char *payload = NULL;
+ char *iter;
+ int send_msgid = 0;
+ int rid = 0;
+ int rc = 0;
+
+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value);
+
+ if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_OID) != 0 ||
+ NULL == extop_value || NULL == extop_value->bv_val){
+ /* something is wrong, error out */
+ return -1;
+ }
+
+ /*
+ * Extract the rid and repl_root from the payload
+ */
+ if(decode_cleanruv_payload(extop_value, &payload)){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to
decode payload. Aborting ext op\n");
+ return -1;
+ }
+ rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
+ repl_root = ldap_utf8strtok_r(iter, ":", &iter);
+
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid
(%d)...\n",rid);
+
+ /*
+ * If we already cleaned this server, just return success
+ */
+ if(is_cleaned_rid(rid)){
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: rid (%d) has
already been cleaned, skipping\n",rid);
+ return rc;
+ } else {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid
(%d)...\n", rid);
+ }
+
+ /*
+ * Get the node, so we can get the replica and its agreements
+ */
+ if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to get
replication node "
+ "from (%s), aborting operation\n", repl_root);
+ return -1;
+ }
+ if (mtnode_ext->replica)
+ object_acquire (mtnode_ext->replica);
+ if (mtnode_ext->replica == NULL){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica is
missing from (%s), "
+ "aborting operation\n",repl_root);
+ rc = LDAP_OPERATIONS_ERROR;
+ goto free_and_return;
+ }
+ r = (Replica*)object_get_data (mtnode_ext->replica);
+ /*
+ * Send out extended ops to each repl agreement
+ */
+ agmt_obj = agmtlist_get_first_agreement_for_replica (r);
+ while (agmt_obj)
+ {
+ agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+ dn = agmt_get_dn_byref(agmt);
+ conn = (Repl_Connection *)agmt_get_connection(agmt);
+ if(conn == NULL){
+ /* no connection for this agreement, move on to the next agmt */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: the replica
(%s), is "
+ "missing the connection. This replica will not be cleaned.\n",
slapi_sdn_get_dn(dn));
+ agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+ continue;
+ }
+ crc = conn_connect(conn);
+ if (CONN_OPERATION_FAILED == crc ){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to
connect "
+ "to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_TRANSIENT_ERROR);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else if (CONN_SSL_NOT_ENABLED == crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to
acquire "
+ "repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_FATAL_ERROR);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else {
+ conn_cancel_linger(conn);
+ crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, extop_value, NULL,
&send_msgid);
+ if (CONN_OPERATION_SUCCESS != crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to
send "
+ "clean_ruv extended op to repl agmt (%s), error %d\n",
slapi_sdn_get_dn(dn), crc);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else {
+ /* success */
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: successfully
sent "
+ "extended op to (%s)\n",slapi_sdn_get_dn(dn) );
+ }
+ conn_start_linger(conn);
+ }
+ if(crc != CONN_OPERATION_SUCCESS){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica (%s)
has not "
+ "been cleaned. You will need to rerun the CLEANALLRUV task on this
replica\n",
+ slapi_sdn_get_dn(dn) );
+ rc = LDAP_OPERATIONS_ERROR;
+ }
+ agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+ }
+
+ /* now clean the ruv */
+ replica_execute_cleanruv_task_ext(mtnode_ext->replica, rid);
+
+free_and_return:
+
+ if(rc == 0){
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaned rid
(%d)\n", rid);
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to
clean rid (%d), error (%d)\n",rid, rc);
+ }
+
+ if (mtnode_ext->replica)
+ object_release (mtnode_ext->replica);
+
+ return rc;
+}
+
+/*
+ * Process the REPL_RELEASERUV_OID extended operation
+ *
+ * Once, all the replicas in the replication farm have been cleaned, then
+ * we need to "release" the cleaned_rid, or else we will reject all updates
+ * that come from that rid until we restart the server.
+ *
+ * We set the cleaned_ruv to zero(invalid rid), and then fire off extended
+ * operations to all of the replicas
+ */
+int
+multimaster_extop_releaseruv(Slapi_PBlock *pb){
+ multimaster_mtnode_extension *mtnode_ext;
+ Repl_Connection *conn;
+ const Slapi_DN *dn;
+ Replica *r = NULL;
+ Object *agmt_obj;
+ Repl_Agmt *agmt;
+ ConnResult crc;
+ struct berval *extop_value;
+ char *payload = NULL;
+ char *extop_oid;
+ char *repl_root;
+ char *iter;
+ int send_msgid = 0;
+ int rid = 0;
+ int rc = -1;
+
+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
+ slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value);
+
+ if (NULL == extop_oid || strcmp(extop_oid, REPL_RELEASERUV_OID) != 0 ||
+ NULL == extop_value || NULL == extop_value->bv_val){
+ /* something is wrong, error out */
+ return -1;
+ }
+
+ if(decode_cleanruv_payload(extop_value, &payload)){
+ slapi_log_error(SLAPI_LOG_FATAL,repl_plugin_name, "releaseruv_extop: failed to
decode payload, aborting ext op\n");
+ return -1;
+ }
+ rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
+ repl_root = ldap_utf8strtok_r(iter, ":", &iter);
+
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: releasing rid
(%d)...\n",rid);
+ /*
+ * If we already released this ruv, just return.
+ */
+ if(is_released_rid(rid) || is_already_released_rid()){
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: rid (%d) has
already been released, skipping\n",rid);
+ return 0;
+ } else {
+ /* set the released rid, and trigger trimming */
+ set_released_rid((int)rid);
+ trigger_cl_trimming();
+ }
+
+ if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to
get node "
+ "from replication root dn(%s), aborting operation.\n", repl_root);
+ return -1;
+ }
+ if (mtnode_ext->replica)
+ object_acquire (mtnode_ext->replica);
+ if (mtnode_ext->replica == NULL){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica is
missing from (%s), "
+ "aborting operation.\n", repl_root);
+ rc = LDAP_OPERATIONS_ERROR;
+ goto free_and_return;
+ }
+ r = (Replica*)object_get_data (mtnode_ext->replica);
+ /*
+ * Loop over the agreements, and send out extended ops
+ */
+ agmt_obj = agmtlist_get_first_agreement_for_replica (r);
+ while (agmt_obj)
+ {
+ agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+ dn = agmt_get_dn_byref(agmt);
+ conn = (Repl_Connection *)agmt_get_connection(agmt);
+ if(conn == NULL){
+ /* no connection for this agreement, log error, and move on */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: the replica
(%s), is "
+ "missing the connection. This replica will not be cleaned.\n",
slapi_sdn_get_dn(dn));
+ agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+ continue;
+ }
+ crc = conn_connect(conn);
+ if (CONN_OPERATION_FAILED == crc ){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to
connect "
+ "to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_TRANSIENT_ERROR);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else if (CONN_SSL_NOT_ENABLED == crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to
acquire "
+ "repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn),
ACQUIRE_FATAL_ERROR);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else {
+ conn_cancel_linger(conn);
+ crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, extop_value, NULL,
&send_msgid);
+ if (CONN_OPERATION_SUCCESS != crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to
send "
+ "releaseruv extended op to repl agmt (%s), error
%d\n", slapi_sdn_get_dn(dn), crc);
+ rc = LDAP_OPERATIONS_ERROR;
+ } else {
+ /* success */
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop:
successfully sent "
+ "extended op to (%s)\n",slapi_sdn_get_dn(dn) );
+ rc = 0;
+ }
+ conn_start_linger(conn);
+ }
+ if(crc){
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica
(%s) has not "
+ "been cleaned. You will need to rerun the RELEASERUV task on this
replica\n",
+ slapi_sdn_get_dn(dn) );
+ }
+ agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+ }
+
+free_and_return:
+ /*
+ * Set the rid as "ALREADY_RELEASED, and remove the cleaned ruv
+ */
+ if(rc == 0){
+ set_released_rid(ALREADY_RELEASED);
+ delete_cleaned_rid();
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: released rid
(%d) successfully\n", rid);
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to
release rid(%d), error (%d), please retry the task\n",rid, rc);
+ }
+
+ if(mtnode_ext->replica)
+ object_release (mtnode_ext->replica);
+
+ return rc;
+}
+
+/*
* This plugin entry point is a noop entry
* point. It's used when registering extops that
* are only used as responses. We'll never receive