[389-commits] ldap/servers

Mark Reynolds mreynolds at fedoraproject.org
Wed Apr 25 02:17:46 UTC 2012


 ldap/servers/plugins/replication/cl5_api.c              |  104 ++++
 ldap/servers/plugins/replication/cl5_api.h              |    4 
 ldap/servers/plugins/replication/repl5.h                |   18 
 ldap/servers/plugins/replication/repl5_agmt.c           |    9 
 ldap/servers/plugins/replication/repl5_init.c           |   63 ++
 ldap/servers/plugins/replication/repl5_plugins.c        |   10 
 ldap/servers/plugins/replication/repl5_replica_config.c |  344 +++++++++++++++-
 ldap/servers/plugins/replication/repl5_ruv.c            |   33 +
 ldap/servers/plugins/replication/repl_extop.c           |  341 +++++++++++++++
 9 files changed, 897 insertions(+), 29 deletions(-)

New commits:
commit 0f50544b9567907edd0ba645951d7cd325354107
Author: root <root at localhost.localdomain>
Date:   Mon Apr 23 13:36:04 2012 -0400

    Ticket #337 - RFE - Improve CLEANRUV functionality
    
    Bug Description:  Previously the steps to remove a replica and its RUV was problematic.
                      I created two new "tasks" to take care of the entire replication environment.
    
    Fix Description:
    
    [1] The new task "CLEANALLRUV<rid>" - run it once on any master
    
        This marks the rid as invalid. Used to reject updates to the changelog, and the database RUV
        It then sends a "CLEANRUV" extended operation to each agreement.
        Then it cleans its own RUV.
    
        The CLEANRUV extended op then triggers that replica to send the the same CLEANRUV extop to its replicas, then it cleans its own RID. Basically
    this operation cascades through the entire replication environment.
    
    [2] The "RELEASERUV<rid>" task - run it once on any master
    
        Once the RUV's have been cleaned on all the replicas, you need to "release" the rid so that it can be reused. This operation also cascades thro
    ugh the entire replication environment. This also triggers changelog trimming.
    
    For all of this to work correctly, there is a list of steps that needs to be followed. This procedure is attached to the ticket.
    
    https://fedorahosted.org/389/ticket/337
    
    reviewed by: ?

diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
index 2b57f91..03d50ca 100644
--- a/ldap/servers/plugins/replication/cl5_api.c
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -355,6 +355,7 @@ static int  _cl5WriteRUV (CL5DBFile *file, PRBool purge);
 static int  _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge);
 static int  _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
 static int  _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv);
+void trigger_cl_trimming_thread();
 
 /* bakup/recovery, import/export */
 static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op,
@@ -3477,6 +3478,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim)
 		count = 0;
 		txnid = NULL;
 		abort = PR_FALSE;
+		ReplicaId rid;
 
 		/* DB txn lock accessed pages until the end of the transaction. */
 		
@@ -3497,13 +3499,14 @@ static void _cl5TrimFile (Object *obj, long *numToTrim)
 			 * This change can be trimmed if it exceeds purge
 			 * parameters and has been seen by all consumers.
 			 */
+			rid = csn_get_replicaid (op.csn);
 			if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) &&
 				 ruv_covers_csn_strict (ruv, op.csn) )
-        	{
+			{
 				rc = _cl5CurrentDeleteEntry (it);
-				if ( rc == CL5_SUCCESS )
+				if ( rc == CL5_SUCCESS && !is_released_rid(rid))
 				{
-					/* update purge vector */
+					/* update purge vector, unless this is a released rid */
 					rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE);				
 				}
 				if ( rc == CL5_SUCCESS)
@@ -3529,8 +3532,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim)
 				 * the trim forever.
 				 */
 				CSN *maxcsn = NULL;
-				ReplicaId rid;
-
 				rid = csn_get_replicaid (op.csn);
 				ruv_get_largest_csn_for_replica (ruv, rid, &maxcsn);
 				if ( csn_compare (op.csn, maxcsn) != 0 )
@@ -3620,10 +3621,10 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
 		*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
 		return ( *numToTrim > 0 );
 	}
-    
+
     if (s_cl5Desc.dbTrim.maxEntries > 0 &&
 		(*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
-        return PR_TRUE;
+    	return PR_TRUE;
 
 	if (time)
 		return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
@@ -3805,6 +3806,7 @@ static int  _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge)
     void *iterator = NULL;
     slapi_operation_parameters op = {0};
     CL5DBFile *file;
+    ReplicaId rid;
 
     PR_ASSERT (replGen && obj);
 
@@ -3828,6 +3830,15 @@ static int  _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge)
     rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL);
     while (rc == CL5_SUCCESS)
     {
+        rid = csn_get_replicaid (op.csn);
+        if(is_cleaned_rid(rid)){
+            /* skip this entry as the rid is invalid */
+            slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV: "
+                "skipping entry because its csn contains a cleaned rid(%d)\n", rid);
+            cl5_operation_parameters_done (&op);
+            rc = _cl5GetNextEntry (&entry, iterator);
+            continue;
+        }
         if (purge)
             rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn); 
         else
@@ -3876,28 +3887,32 @@ static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge
 
     file = (CL5DBFile*)object_get_data (obj);
 
-	/* if purge is TRUE, file->purgeRUV must be set;
-	   if purge is FALSE, maxRUV must be set */
+    /*
+     *  if purge is TRUE, file->purgeRUV must be set;
+     *  if purge is FALSE, maxRUV must be set
+     */
     PR_ASSERT (file && ((purge && file->purgeRUV) || (!purge && file->maxRUV)));
+    rid = csn_get_replicaid(csn);
 
     /* update vector only if this replica is not yet part of RUV */
     if (purge && newReplica)
     {
-        rid = csn_get_replicaid(csn);   
         if (ruv_contains_replica (file->purgeRUV, rid))
             return CL5_SUCCESS;
-        else
+        else if(is_cleaned_rid(rid))
         {
-	  /* if the replica is not part of the purgeRUV yet, add it */
-	  ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl());               
+            /* if the replica is not part of the purgeRUV yet, add it unless it's from a cleaned rid */
+            ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl());
         }
     }
     else
     {
         if (purge)
             rc = ruv_set_csns(file->purgeRUV, csn, NULL);
-        else
-            rc = ruv_set_csns(file->maxRUV, csn, NULL); 
+        else if(is_cleaned_rid(rid)){
+            /* don't update maxRuv is if rid is cleaned */
+            rc = ruv_set_csns(file->maxRUV, csn, NULL);
+        }
     }
  
     if (rc != RUV_SUCCESS)
@@ -4502,9 +4517,17 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
 	CL5Entry entry;
 	CL5DBFile *file = NULL;
 	Object *file_obj = NULL;
+	ReplicaId rid = csn_get_replicaid (op->csn);
 	DB_TXN *txnid = NULL;
 	DB_TXN *parent_txnid = (DB_TXN *)txn;
 
+	/*
+	 *  If the op csn contains the cleaned rid, don't write it
+	 */
+	if(is_cleaned_rid(rid)){
+		return CL5_SUCCESS;
+	}
+
 	rc = _cl5GetDBFileByReplicaName (replName, replGen, &file_obj);
 	if (rc == CL5_NOTFOUND)
 	{
@@ -4770,7 +4793,7 @@ static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_
 				rc, db_strerror(rc));
 	rc = CL5_DB_ERROR;
 
-done:;
+done:
 	/* error occured */
 	/* We didn't success in assigning this cursor to the iterator,
 	 * so we need to free the cursor here */
@@ -6512,3 +6535,52 @@ bail:
     changelog5_config_done(&config);
     return rc;
 }
+
+/*
+ *  Clean the in memory RUV, at shutdown we will write the update to the db
+ */
+void
+cl5CleanRUV(ReplicaId rid){
+    CL5DBFile *file;
+    Object *obj;
+
+    obj = objset_first_obj(s_cl5Desc.dbFiles);
+    while (obj){
+        file = (CL5DBFile *)object_get_data(obj);
+        ruv_delete_replica(file->purgeRUV, rid);
+        ruv_delete_replica(file->maxRUV, rid);
+        obj = objset_next_obj(s_cl5Desc.dbFiles, obj);
+    }
+
+    if (obj)
+        object_release (obj);
+}
+
+void trigger_cl_trimming(){
+    PRThread *trim_tid = NULL;
+    ReplicaId rid = get_released_rid();
+
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_trimming: rid (%d)\n",(int)rid);
+    trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_trimming_thread,
+                   NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+                   PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
+    if (NULL == trim_tid){
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+            "trigger_cl_trimming: failed to create trimming "
+            "thread; NSPR error - %d\n", PR_GetError ());
+    } else {
+        /* need a little time for the thread to get started */
+        DS_Sleep(PR_SecondsToInterval(1));
+    }
+}
+
+void
+trigger_cl_trimming_thread(){
+    /* make sure we have a change log, and we aren't closing it */
+    if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
+        return;
+    }
+    _cl5AddThread();
+    _cl5DoTrimming();
+    _cl5RemoveThread();
+}
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
index 6f2552f..b9c3dd8 100644
--- a/ldap/servers/plugins/replication/cl5_api.h
+++ b/ldap/servers/plugins/replication/cl5_api.h
@@ -487,4 +487,8 @@ int cl5WriteRUV();
    Return:		TRUE
 */
 int cl5DeleteRUV();
+void cl5CleanRUV(ReplicaId rid);
+void cl5NotifyCleanup(int rid);
+void trigger_cl_trimming();
+
 #endif
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index ac2cd88..e315150 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -94,6 +94,9 @@
  * new set of start and response extops. */
 #define REPL_START_NSDS90_REPLICATION_REQUEST_OID "2.16.840.1.113730.3.5.12"
 #define REPL_NSDS90_REPLICATION_RESPONSE_OID "2.16.840.1.113730.3.5.13"
+/* cleanruv/releaseruv extended ops */
+#define REPL_CLEANRUV_OID "2.16.840.1.113730.3.6.5"
+#define REPL_RELEASERUV_OID "2.16.840.1.113730.3.6.6"
 
 
 /* DS 5.0 replication protocol error codes */
@@ -218,6 +221,8 @@ char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
 /* In repl_extop.c */
 int multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb);
 int multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb);
+int multimaster_extop_cleanruv(Slapi_PBlock *pb);
+int multimaster_extop_releaseruv(Slapi_PBlock *pb);
 int extop_noop(Slapi_PBlock *pb);
 struct berval *NSDS50StartReplicationRequest_new(const char *protocol_oid,
 	const char *repl_root, char **extra_referrals, CSN *csn);
@@ -345,8 +350,8 @@ char **agmt_get_fractional_attrs_total(const Repl_Agmt *ra);
 char **agmt_validate_replicated_attributes(Repl_Agmt *ra, int total);
 void* agmt_get_priv (const Repl_Agmt *agmt);
 void agmt_set_priv (Repl_Agmt *agmt, void* priv);
-
 int get_agmt_agreement_type ( Repl_Agmt *agmt);
+void* agmt_get_connection( Repl_Agmt *ra);
 int agmt_has_protocol(Repl_Agmt *agmt);
 
 typedef struct replica Replica;
@@ -579,6 +584,17 @@ void multimaster_be_state_change (void *handle, char *be_name, int old_be_state,
 int replica_config_init();
 void replica_config_destroy ();
 int get_replica_type(Replica *r);
+int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid);
+void set_cleaned_rid(ReplicaId rid);
+void delete_cleaned_rid();
+int is_cleaned_rid(ReplicaId rid);
+int get_released_rid();
+void set_released_rid(int rid);
+int is_released_rid(int rid);
+int is_already_released_rid();
+void delete_released_rid();
+
+#define ALREADY_RELEASED -1
 
 /* replutil.c */
 LDAPControl* create_managedsait_control ();
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
index 1d9affa..8714021 100644
--- a/ldap/servers/plugins/replication/repl5_agmt.c
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -2425,6 +2425,15 @@ ReplicaId agmt_get_consumerRID(Repl_Agmt *ra)
 	return ra->consumerRID;
 }
 
+void* agmt_get_connection(Repl_Agmt *ra)
+{
+	if(ra->protocol){
+		return (void *)prot_get_connection(ra->protocol);
+	} else {
+		return NULL;
+	}
+}
+
 int
 agmt_has_protocol(Repl_Agmt *agmt)
 {
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
index 4e2464c..a878b19 100644
--- a/ldap/servers/plugins/replication/repl5_init.c
+++ b/ldap/servers/plugins/replication/repl5_init.c
@@ -118,6 +118,22 @@ static char *response_name_list[] = {
 		NSDS_REPL_NAME_PREFIX " Response",
 		NULL
 };
+static char *cleanruv_oid_list[] = {
+		REPL_CLEANRUV_OID,
+		NULL
+};
+static char *cleanruv_name_list[] = {
+		NSDS_REPL_NAME_PREFIX " Cleanruv",
+		NULL
+};
+static char *releaseruv_oid_list[] = {
+		REPL_RELEASERUV_OID,
+		NULL
+};
+static char *releaseruv_name_list[] = {
+		NSDS_REPL_NAME_PREFIX " Releaseruv",
+		NULL
+};
 
 /* List of plugin identities for every plugin registered. Plugin identity
    is passed by the server in the plugin init function and must be supplied 
@@ -434,6 +450,51 @@ multimaster_response_extop_init( Slapi_PBlock *pb )
     return rc;
 }
 
+int
+multimaster_cleanruv_extop_init( Slapi_PBlock *pb )
+{
+	int rc= 0; /* OK */
+	void *identity = NULL;
+
+	/* get plugin identity and store it to pass to internal operations */
+	slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity);
+	PR_ASSERT (identity);
+
+	if (slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, SLAPI_PLUGIN_VERSION_01 ) != 0 ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, (void *)&multimasterextopdesc ) != 0 ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_OIDLIST, (void *)cleanruv_oid_list ) != 0  ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_NAMELIST, (void *)cleanruv_name_list ) != 0  ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_FN, (void *)multimaster_extop_cleanruv ))
+	{
+		slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name, "multimaster_cleanruv_extop_init failed\n" );
+		rc= -1;
+	}
+
+	return rc;
+}
+
+int
+multimaster_releaseruv_extop_init( Slapi_PBlock *pb )
+{
+	int rc= 0; /* OK */
+	void *identity = NULL;
+
+	/* get plugin identity and store it to pass to internal operations */
+	slapi_pblock_get (pb, SLAPI_PLUGIN_IDENTITY, &identity);
+	PR_ASSERT (identity);
+
+	if (slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, SLAPI_PLUGIN_VERSION_01 ) != 0 ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, (void *)&multimasterextopdesc ) != 0 ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_OIDLIST, (void *)releaseruv_oid_list ) != 0  ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_NAMELIST, (void *)releaseruv_name_list ) != 0  ||
+		slapi_pblock_set( pb, SLAPI_PLUGIN_EXT_OP_FN, (void *)multimaster_extop_releaseruv ))
+	{
+		slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name, "multimaster_releaseruv_extop_init failed\n" );
+		rc= -1;
+	}
+
+	return rc;
+}
 
 static PRBool
 check_for_ldif_dump(Slapi_PBlock *pb)
@@ -618,6 +679,8 @@ int replication_multimaster_plugin_init(Slapi_PBlock *pb)
 		rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_end_extop_init", multimaster_end_extop_init, "Multimaster replication end extended operation plugin", NULL, identity);
 		rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_total_extop_init", multimaster_total_extop_init, "Multimaster replication total update extended operation plugin", NULL, identity);
 		rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_response_extop_init", multimaster_response_extop_init, "Multimaster replication extended response plugin", NULL, identity);
+		rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_cleanruv_extop_init", multimaster_cleanruv_extop_init, "Multimaster replication cleanruv extended operation plugin", NULL, identity);
+		rc= slapi_register_plugin("extendedop", 1 /* Enabled */, "multimaster_releaseruv_extop_init", multimaster_releaseruv_extop_init, "Multimaster replication releaserid extended response plugin", NULL, identity);
 		if (0 == rc)
 		{
 			multimaster_initialised = 1;
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
index c806c08..dfbb80e 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -1005,6 +1005,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	r = (Replica*)object_get_data (repl_obj);
 	PR_ASSERT (r);
 
+	/*
+	 *  In case we had to run cleanruv, we don't want to continue to write
+	 *  updates to the changelog/database ruv from that replica(rid).
+	 */
+	if( is_cleaned_rid(replica_get_rid(r))){
+		/* this RID has been cleaned, just goto done */
+		goto done;
+	}
+
 	if (replica_is_flag_set (r, REPLICA_LOG_CHANGES) &&
 		(cl5GetState () == CL5_STATE_OPEN))
 	{
@@ -1111,6 +1120,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 		update_ruv_component(r, opcsn, pb);
 	}
 
+done:
 	object_release (repl_obj);
 	return return_value;
 }
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
index 94f4179..78a948b 100644
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
@@ -56,9 +56,15 @@
 #define LDIF2CL_TASK        "LDIF2CL"
 #define CLEANRUV            "CLEANRUV"
 #define CLEANRUVLEN         8
+#define CLEANALLRUV         "CLEANALLRUV"
+#define CLEANALLRUVLEN      11
+#define RELEASERUV          "RELEASERUV"
+#define RELEASERUVLEN       10
 #define REPLICA_RDN         "cn=replica"
 
 int slapi_log_urp = SLAPI_LOG_REPL;
+static ReplicaId cleaned_rid = 0;
+static int released_rid = 0;
 
 /* Forward Declartions */
 static int replica_config_add (Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg);
@@ -74,6 +80,9 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt
 static int replica_execute_cl2ldif_task (Object *r, char *returntext);
 static int replica_execute_ldif2cl_task (Object *r, char *returntext);
 static int replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext);
+static int replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext);
+static int replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext);
+static struct berval *create_ruv_payload(char *value);
 static int replica_cleanup_task (Object *r, const char *task_name, char *returntext, int apply_mods);
 static int replica_task_done(Replica *replica);
 												
@@ -825,10 +834,8 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt
 	{
 		int temprid = atoi(&(task_name[CLEANRUVLEN]));
 		if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
-			PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE,
-						"Invalid replica id for task - %s", task_name);
-			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
-							"replica_execute_task: %s\n", returntext);
+			PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for task - %s", temprid, task_name);
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_execute_task: %s\n", returntext);
 			return LDAP_OPERATIONS_ERROR;
 		}
 		if (apply_mods)
@@ -838,6 +845,36 @@ static int replica_execute_task (Object *r, const char *task_name, char *returnt
 		else
 			return LDAP_SUCCESS;
 	}
+	else if (strncasecmp (task_name, CLEANALLRUV, CLEANALLRUVLEN) == 0)
+	{
+		int temprid = atoi(&(task_name[CLEANALLRUVLEN]));
+		if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
+			PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id (%d) for task - (%s)", temprid, task_name);
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_execute_task: %s\n", returntext);
+			return LDAP_OPERATIONS_ERROR;
+		}
+		if (apply_mods)
+		{
+			return replica_execute_cleanall_ruv_task(r, (ReplicaId)temprid, returntext);
+		}
+		else
+			return LDAP_SUCCESS;
+	}
+	else if (strncasecmp (task_name, RELEASERUV, RELEASERUVLEN) == 0)
+	{
+		int temprid = atoi(&(task_name[RELEASERUVLEN]));
+		if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID){
+			PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "Invalid replica id for task - %s", task_name);
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,"replica_execute_task: %s\n", returntext);
+			return LDAP_OPERATIONS_ERROR;
+		}
+		if (apply_mods)
+		{
+			return replica_execute_release_ruv_task(r, (ReplicaId)temprid, returntext);
+		}
+		else
+			return LDAP_SUCCESS;
+	}
 	else
 	{
         PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, "unsupported replica task - %s", task_name);
@@ -1099,16 +1136,24 @@ _replica_config_get_mtnode_ext (const Slapi_Entry *e)
     return ext;
 }
 
+int
+replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid)
+{
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: calling clean_ruv_ext\n");
+	return replica_execute_cleanruv_task(r, rid, NULL);
+}
+
 static int
-replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext)
+replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not used */)
 {
 	int rc = 0;
 	Object *RUVObj;
 	RUV *local_ruv = NULL;
-    Replica *replica = (Replica*)object_get_data (r);
+	Replica *replica = (Replica*)object_get_data (r);
 
-    PR_ASSERT (replica);
+	PR_ASSERT (replica);
 
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_task: cleaning rid (%d)...\n",(int)rid);
 	RUVObj = replica_get_ruv(replica);
 	PR_ASSERT(RUVObj);
 	local_ruv =  (RUV*)object_get_data (RUVObj);
@@ -1128,10 +1173,295 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext)
 	/* Update Mapping Tree to reflect RUV changes */
 	consumer5_set_mapping_tree_state_for_replica(replica, NULL);
 	
+	/*
+	 *  Clean the changelog RUV's, and set the rids
+	 */
+	cl5CleanRUV(rid);
+	set_cleaned_rid(rid);
+	delete_released_rid();
+
 	if (rc != RUV_SUCCESS){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc);
 		return LDAP_OPERATIONS_ERROR;
 	}
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_task: finished successfully\n");
 	return LDAP_SUCCESS;
 }
 
+static int
+replica_execute_cleanall_ruv_task (Object *r, ReplicaId rid, char *returntext)
+{
+	Repl_Connection *conn;
+	Replica *replica = (Replica*)object_get_data (r);
+	Object *agmt_obj;
+	Repl_Agmt *agmt;
+	ConnResult crc;
+	const Slapi_DN *dn = NULL;
+	struct berval *payload = NULL;
+	char *ridstr = NULL;
+	int send_msgid = 0;
+	int rc = 0;
+
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: cleaning rid (%d)...\n",(int)rid);
+
+	/*
+	 *  Create payload
+	 */
+	ridstr = slapi_ch_smprintf("%d:%s", rid, slapi_sdn_get_dn(replica_get_root(replica)));
+	payload = create_ruv_payload(ridstr);
+	if(payload == NULL){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to create ext op payload, aborting task\n");
+		goto done;
+	}
+
+	agmt_obj = agmtlist_get_first_agreement_for_replica (replica);
+	while (agmt_obj)
+	{
+		agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+		dn = agmt_get_dn_byref(agmt);
+		conn = (Repl_Connection *)agmt_get_connection(agmt);
+		if(conn == NULL){
+			/* no connection for this agreement, and move on */
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: the replica (%s), is "
+				"missing the connection.  This replica will not be cleaned.\n", slapi_sdn_get_dn(dn));
+			agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+			continue;
+		}
+		crc = conn_connect(conn);
+		if (CONN_OPERATION_FAILED == crc ){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to connect "
+				"to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR);
+		} else if (CONN_SSL_NOT_ENABLED == crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to acquire "
+				"repl agmt connection (%s), errror %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
+		} else {
+			conn_cancel_linger(conn);
+			crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, payload, NULL, &send_msgid);
+			if (CONN_OPERATION_SUCCESS != crc){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: failed to send "
+					"cleanruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
+			} else {
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: successfully sent "
+					"cleanruv extended op to (%s)\n",slapi_sdn_get_dn(dn));
+			}
+			conn_start_linger(conn);
+		}
+		if(crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: replica (%s) has not "
+				"been cleaned.  You will need to rerun the CLEANALLRUV task on this replica.\n", slapi_sdn_get_dn(dn));
+			rc = crc;
+		}
+		agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+	}
+
+done:
+
+	if(payload)
+		ber_bvfree(payload);
+
+	slapi_ch_free_string(&ridstr);
+
+	/*
+	 *  Now run the cleanruv task
+	 */
+	replica_execute_cleanruv_task (r, rid, returntext);
+
+	if(rc == 0){
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanAllRUV_task: operation successful\n");
+	} else {
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanAllRUV_task: operation failed (%d)\n",rc);
+	}
+
+	return rc;
+}
+
+static int
+replica_execute_release_ruv_task(Object *r, ReplicaId rid, char *returntext)
+{
+	Repl_Connection *conn;
+	Replica *replica = (Replica*)object_get_data (r);
+	Object *agmt_obj;
+	Repl_Agmt *agmt;
+	ConnResult crc;
+	const Slapi_DN *dn = NULL;
+	struct berval *payload = NULL;
+	char *ridstr = NULL;
+	int send_msgid = 0;
+	int rc = 0;
+
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: releasing rid (%d)...\n", rid);
+
+	/*
+	 * Set the released rid, and trigger cl trimmming
+	 */
+	set_released_rid((int)rid);
+	trigger_cl_trimming();
+	/*
+	 *  Create payload
+	 */
+	ridstr = slapi_ch_smprintf("%d:%s", rid, slapi_sdn_get_dn(replica_get_root(replica)));
+	payload = create_ruv_payload(ridstr);
+	if(payload == NULL){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to create ext op payload, aborting op\n");
+		rc = -1;
+		goto done;
+	}
+
+	agmt_obj = agmtlist_get_first_agreement_for_replica (replica);
+	while (agmt_obj)
+	{
+		agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+		dn = agmt_get_dn_byref(agmt);
+		conn = (Repl_Connection *)agmt_get_connection(agmt);
+		if(conn == NULL){
+			/* no connection for this agreement, log error, and move on */
+			agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+			continue;
+		}
+		crc = conn_connect(conn);
+		if (CONN_OPERATION_FAILED == crc ){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to connect "
+				"to repl agmt (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR);
+			rc = LDAP_OPERATIONS_ERROR;
+		} else if (CONN_SSL_NOT_ENABLED == crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to acquire "
+				"repl agmt (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
+			rc = LDAP_OPERATIONS_ERROR;
+		} else {
+			conn_cancel_linger(conn);
+			crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, payload, NULL, &send_msgid);
+			if (CONN_OPERATION_SUCCESS != crc){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: failed to send "
+					"releaseruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
+				rc = LDAP_OPERATIONS_ERROR;
+			} else {
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: successfully sent "
+					"extended op to (%s)\n",slapi_sdn_get_dn(dn));
+			}
+			conn_start_linger(conn);
+		}
+		if(crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: replica (%s) has not "
+					"been cleaned.  You will need to rerun the RELEASERUV task on this replica\n",
+					slapi_sdn_get_dn(dn));
+			rc = crc;
+		}
+		agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
+	}
+
+done:
+	/*
+	 *  reset the released/clean rid
+	 */
+	if(rc == 0){
+		set_released_rid(ALREADY_RELEASED);
+		delete_cleaned_rid();
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseRUV_task: Successfully released rid (%d)\n", rid);
+	} else {
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseRUV_task: Failed to release rid (%d), error (%d)\n", rid, rc);
+	}
+
+	if(payload)
+		ber_bvfree(payload);
+
+	slapi_ch_free_string(&ridstr);
+
+	return rc;
+}
+
+static struct berval *
+create_ruv_payload(char *value){
+	struct berval *req_data = NULL;
+	BerElement *tmp_bere = NULL;
 
+	if ((tmp_bere = der_alloc()) == NULL){
+		goto error;
+	}
+	if (ber_printf(tmp_bere, "{s", value) == -1){
+		goto error;
+	}
+
+	if (ber_printf(tmp_bere, "}") == -1){
+		goto error;
+	}
+
+	if (ber_flatten(tmp_bere, &req_data) == -1){
+		goto error;
+	}
+
+	goto done;
+
+error:
+	if (NULL != req_data){
+		ber_bvfree(req_data);
+		req_data = NULL;
+	}
+
+done:
+	if (NULL != tmp_bere){
+		ber_free(tmp_bere, 1);
+		tmp_bere = NULL;
+	}
+
+	return req_data;
+}
+
+int
+is_cleaned_rid(ReplicaId rid)
+{
+	if(rid == cleaned_rid){
+		return 1;
+	} else {
+		return 0;
+	}
+}
+
+void
+set_cleaned_rid( ReplicaId rid )
+{
+	cleaned_rid = rid;
+}
+
+void
+delete_cleaned_rid()
+{
+	cleaned_rid = 0;
+}
+
+int
+get_released_rid()
+{
+	return released_rid;
+}
+
+int
+is_released_rid(int rid)
+{
+	if(rid == released_rid){
+		return 1;
+	} else {
+		return 0;
+	}
+}
+
+int
+is_already_released_rid()
+{
+	if(released_rid == ALREADY_RELEASED){
+		return 1;
+	} else {
+		return 0;
+	}
+}
+
+void
+set_released_rid( int rid )
+{
+	released_rid = rid;
+}
+
+void
+delete_released_rid()
+{
+	released_rid = 0;
+}
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
index 9a80fcc..9ad5c5a 100644
--- a/ldap/servers/plugins/replication/repl5_ruv.c
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
@@ -863,10 +863,27 @@ ruv_covers_csn_internal(const RUV *ruv, const CSN *csn, PRBool strict)
 	{
 		rid = csn_get_replicaid(csn);
 		replica = ruvGetReplica (ruv, rid);
+		if((is_released_rid(rid)) || (replica == NULL && is_already_released_rid()) ){
+			/* this is a released rid, so return true */
+			return PR_TRUE;
+		}
 		if (replica == NULL)
 		{
-			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn: replica for id %d not found\n", rid);
-			return_value = PR_FALSE;
+			/*
+			 *  We don't know anything about this replica change in the cl, mark it to be zapped.
+			 *  This could of been a previously cleaned ruv, but the server was restarted before
+			 *  the change could be trimmed.
+			 *
+			 *  Only the change log trimming calls this function with "strict" set.  So we'll return success
+			 *  if strict is set.
+			 */
+			if(strict){
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_covers_csn: replica for id %d not found.\n", rid);
+				return_value = PR_TRUE;
+			} else {
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "ruv_covers_csn: replica for id %d not found.\n", rid);
+				return_value = PR_FALSE;
+			}
 		}
 		else
 		{
@@ -1403,12 +1420,20 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
     RUVElement* replica;
     char csn_str[CSN_STRSIZE];
     int rc = RUV_SUCCESS;
+    int rid = csn_get_replicaid (csn);
 
     PR_ASSERT (ruv && csn);
 
     /* locate ruvElement */
     slapi_rwlock_wrlock (ruv->lock);
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
+
+    if(is_cleaned_rid(rid)){
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_add_csn_inprogress: invalid replica ID"
+            "(%d), aborting update\n", rid);
+        /* return success because we want to consume the update, but not perform it */
+        goto done;
+    }
+    replica = ruvGetReplica (ruv, rid);
     if (replica == NULL)
     {
         replica = ruvAddReplicaNoCSN (ruv, csn_get_replicaid (csn), NULL/*purl*/);
@@ -1416,7 +1441,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
         {
             if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
                 slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "ruv_add_csn_inprogress: failed to add replica"
-                                " that created csn %s\n", csn_as_string (csn, PR_FALSE, csn_str));
+                    " that created csn %s\n", csn_as_string (csn, PR_FALSE, csn_str));
             }
             rc = RUV_MEMORY_ERROR;
             goto done;
diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c
index 2ff3627..7e2ebbd 100644
--- a/ldap/servers/plugins/replication/repl_extop.c
+++ b/ldap/servers/plugins/replication/repl_extop.c
@@ -71,6 +71,7 @@
  *
  */
 static int check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv);
+static multimaster_mtnode_extension *replica_config_get_mtnode_by_dn(const char *dn);
 
 static int 
 encode_ruv (BerElement *ber, const RUV *ruv)
@@ -1260,7 +1261,6 @@ multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb)
 			{
 				/* The ruv from the supplier may have changed. Report the change on the
 					consumer side */
-
 				replica_update_ruv_consumer(r, connext->supplier_ruv);
 			}
 
@@ -1315,6 +1315,345 @@ free_and_return:
 }
 
 /*
+ *  Return the mtnode extension of the dn
+ */
+static multimaster_mtnode_extension *
+replica_config_get_mtnode_by_dn(const char *dn)
+{
+	Slapi_DN *sdn;
+	mapping_tree_node *mtnode;
+	multimaster_mtnode_extension *ext = NULL;
+
+	sdn = slapi_sdn_new_dn_byval(dn);
+	mtnode = slapi_get_mapping_tree_node_by_dn (sdn);
+	if (mtnode)	{
+		/* check if the replica object already exists in the subtree */
+		ext = (multimaster_mtnode_extension *)repl_con_get_ext (REPL_CON_EXT_MTNODE, mtnode);
+	}
+	slapi_sdn_free (&sdn);
+
+	return ext;
+}
+
+/*
+ * Decode the ber element passed to us by the cleanAllRUV task
+ */
+static int
+decode_cleanruv_payload(struct berval *extop_value, char **payload)
+{
+	BerElement *tmp_bere = NULL;
+	int rc = 0;
+
+	if ((tmp_bere = ber_init(extop_value)) == NULL){
+		rc = -1;
+		goto free_and_return;
+	}
+	if (ber_scanf(tmp_bere, "{") == LBER_ERROR){
+		rc = -1;
+		goto free_and_return;
+	}
+	if (ber_get_stringa(tmp_bere, payload) == LBER_DEFAULT){
+		rc = -1;
+		goto free_and_return;
+	}
+
+	if (ber_scanf(tmp_bere, "}") == LBER_ERROR){
+		rc = -1;
+		goto free_and_return;
+	}
+
+free_and_return:
+	if (-1 == rc){
+		slapi_ch_free_string(payload);
+	}
+	if (NULL != tmp_bere){
+		ber_free(tmp_bere, 1);
+		tmp_bere = NULL;
+	}
+	return rc;
+}
+
+/*
+ *  Process the REPL_CLEANRUV_OID extended operation.
+ *
+ *  The payload consists of the replica ID, and the repl root dn.  Since this is
+ *  basically a replication operation, it could of originated here and bounced
+ *  back from another master.  So check the rid against the "cleaned_rid".  If
+ *  it's a match, then we were already here, and we can just return success.
+ *
+ *  Otherwise, we the set the cleaned_rid from the payload, fire off extended ops
+ *  to all the replica agreements on this replica.  Then perform the actual
+ *  cleanruv_task on this replica.
+ */
+int
+multimaster_extop_cleanruv(Slapi_PBlock *pb){
+	multimaster_mtnode_extension *mtnode_ext;
+	Repl_Connection *conn;
+	const Slapi_DN *dn;
+	Replica *r = NULL;
+	Object *agmt_obj;
+	Repl_Agmt *agmt;
+	ConnResult crc;
+	struct berval *extop_value;
+	char *extop_oid;
+	char *repl_root;
+	char *payload = NULL;
+	char *iter;
+	int send_msgid = 0;
+	int rid = 0;
+	int rc = 0;
+
+	slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
+	slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value);
+
+	if (NULL == extop_oid || strcmp(extop_oid, REPL_CLEANRUV_OID) != 0 ||
+	    NULL == extop_value || NULL == extop_value->bv_val){
+		/* something is wrong, error out */
+		return -1;
+	}
+
+	/*
+	 *  Extract the rid and repl_root from the payload
+	 */
+	if(decode_cleanruv_payload(extop_value, &payload)){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to decode payload.  Aborting ext op\n");
+		return -1;
+	}
+	rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
+	repl_root = ldap_utf8strtok_r(iter, ":", &iter);
+
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid (%d)...\n",rid);
+
+	/*
+	 *  If we already cleaned this server, just return success
+	 */
+	if(is_cleaned_rid(rid)){
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: rid (%d) has already been cleaned, skipping\n",rid);
+		return rc;
+	} else {
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaning rid (%d)...\n", rid);
+	}
+
+	/*
+	 *  Get the node, so we can get the replica and its agreements
+	 */
+	if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to get replication node "
+			"from (%s), aborting operation\n", repl_root);
+		return -1;
+	}
+	if (mtnode_ext->replica)
+		object_acquire (mtnode_ext->replica);
+	if (mtnode_ext->replica == NULL){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica is missing from (%s), "
+			"aborting operation\n",repl_root);
+		rc = LDAP_OPERATIONS_ERROR;
+		goto free_and_return;
+	}
+	r = (Replica*)object_get_data (mtnode_ext->replica);
+	/*
+	 * Send out extended ops to each repl agreement
+	 */
+	agmt_obj = agmtlist_get_first_agreement_for_replica (r);
+	while (agmt_obj)
+	{
+		agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+		dn = agmt_get_dn_byref(agmt);
+		conn = (Repl_Connection *)agmt_get_connection(agmt);
+		if(conn == NULL){
+			/* no connection for this agreement, move on to the next agmt */
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: the replica (%s), is "
+				"missing the connection.  This replica will not be cleaned.\n", slapi_sdn_get_dn(dn));
+			agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+			continue;
+		}
+		crc = conn_connect(conn);
+		if (CONN_OPERATION_FAILED == crc ){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to connect "
+				"to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR);
+			rc = LDAP_OPERATIONS_ERROR;
+		} else if (CONN_SSL_NOT_ENABLED == crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to acquire "
+				"repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
+			rc = LDAP_OPERATIONS_ERROR;
+		} else {
+			conn_cancel_linger(conn);
+			crc = conn_send_extended_operation(conn, REPL_CLEANRUV_OID, extop_value, NULL, &send_msgid);
+			if (CONN_OPERATION_SUCCESS != crc){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to send "
+					"clean_ruv extended op to repl agmt (%s), error %d\n", slapi_sdn_get_dn(dn), crc);
+				rc = LDAP_OPERATIONS_ERROR;
+			} else {
+				/* success */
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: successfully sent "
+					"extended op to (%s)\n",slapi_sdn_get_dn(dn) );
+			}
+			conn_start_linger(conn);
+		}
+		if(crc != CONN_OPERATION_SUCCESS){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: replica (%s) has not "
+					        "been cleaned.  You will need to rerun the CLEANALLRUV task on this replica\n",
+					        slapi_sdn_get_dn(dn) );
+			rc = LDAP_OPERATIONS_ERROR;
+		}
+		agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+	}
+
+	/* now clean the ruv */
+	replica_execute_cleanruv_task_ext(mtnode_ext->replica, rid);
+
+free_and_return:
+
+	if(rc == 0){
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "cleanruv_extop: cleaned rid (%d)\n", rid);
+	} else {
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_extop: failed to clean rid (%d), error (%d)\n",rid, rc);
+	}
+
+	if (mtnode_ext->replica)
+		object_release (mtnode_ext->replica);
+
+	return rc;
+}
+
+/*
+ *  Process the REPL_RELEASERUV_OID extended operation
+ *
+ *  Once, all the replicas in the replication farm have been cleaned, then
+ *  we need to "release" the cleaned_rid, or else we will reject all updates
+ *  that come from that rid until we restart the server.
+ *
+ *  We set the cleaned_ruv to zero(invalid rid), and then fire off extended
+ *  operations to all of the replicas
+ */
+int
+multimaster_extop_releaseruv(Slapi_PBlock *pb){
+	multimaster_mtnode_extension *mtnode_ext;
+	Repl_Connection *conn;
+	const Slapi_DN *dn;
+	Replica *r = NULL;
+	Object *agmt_obj;
+	Repl_Agmt *agmt;
+	ConnResult crc;
+	struct berval *extop_value;
+	char *payload = NULL;
+	char *extop_oid;
+	char *repl_root;
+	char *iter;
+	int send_msgid = 0;
+	int rid = 0;
+	int rc = -1;
+
+	slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid);
+	slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value);
+
+	if (NULL == extop_oid || strcmp(extop_oid, REPL_RELEASERUV_OID) != 0 ||
+		NULL == extop_value || NULL == extop_value->bv_val){
+		/* something is wrong, error out */
+		return -1;
+	}
+
+	if(decode_cleanruv_payload(extop_value, &payload)){
+		slapi_log_error(SLAPI_LOG_FATAL,repl_plugin_name, "releaseruv_extop: failed to decode payload, aborting ext op\n");
+		return -1;
+	}
+	rid = atoi(ldap_utf8strtok_r(payload, ":", &iter));
+	repl_root = ldap_utf8strtok_r(iter, ":", &iter);
+
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: releasing rid (%d)...\n",rid);
+	/*
+	 *  If we already released this ruv, just return.
+	 */
+	if(is_released_rid(rid) || is_already_released_rid()){
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: rid (%d) has already been released, skipping\n",rid);
+		return 0;
+	} else {
+		/* set the released rid, and trigger trimming */
+		set_released_rid((int)rid);
+		trigger_cl_trimming();
+	}
+
+	if((mtnode_ext = replica_config_get_mtnode_by_dn(repl_root)) == NULL){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to get node "
+			"from replication root dn(%s), aborting operation.\n", repl_root);
+		return -1;
+	}
+	if (mtnode_ext->replica)
+		object_acquire (mtnode_ext->replica);
+	if (mtnode_ext->replica == NULL){
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica is missing from (%s), "
+			"aborting operation.\n", repl_root);
+		rc = LDAP_OPERATIONS_ERROR;
+		goto free_and_return;
+	}
+	r = (Replica*)object_get_data (mtnode_ext->replica);
+	/*
+	 *  Loop over the agreements, and send out extended ops
+	 */
+	agmt_obj = agmtlist_get_first_agreement_for_replica (r);
+	while (agmt_obj)
+	{
+		agmt = (Repl_Agmt*)object_get_data (agmt_obj);
+		dn = agmt_get_dn_byref(agmt);
+		conn = (Repl_Connection *)agmt_get_connection(agmt);
+		if(conn == NULL){
+			/* no connection for this agreement, log error, and move on */
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: the replica (%s), is "
+				"missing the connection.  This replica will not be cleaned.\n", slapi_sdn_get_dn(dn));
+			agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+			continue;
+		}
+		crc = conn_connect(conn);
+		if (CONN_OPERATION_FAILED == crc ){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to connect "
+				"to repl agreement connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_TRANSIENT_ERROR);
+			rc = LDAP_OPERATIONS_ERROR;
+		} else if (CONN_SSL_NOT_ENABLED == crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to acquire "
+				"repl agmt connection (%s), error %d\n",slapi_sdn_get_dn(dn), ACQUIRE_FATAL_ERROR);
+			rc = LDAP_OPERATIONS_ERROR;
+		} else {
+			conn_cancel_linger(conn);
+			crc = conn_send_extended_operation(conn, REPL_RELEASERUV_OID, extop_value, NULL, &send_msgid);
+			if (CONN_OPERATION_SUCCESS != crc){
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to send "
+					"releaseruv extended op to repl agmt (%s), error %d\n",	slapi_sdn_get_dn(dn), crc);
+				rc = LDAP_OPERATIONS_ERROR;
+			} else {
+				/* success */
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: successfully sent "
+					"extended op to (%s)\n",slapi_sdn_get_dn(dn) );
+				rc = 0;
+			}
+			conn_start_linger(conn);
+		}
+		if(crc){
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: replica (%s) has not "
+				"been cleaned.  You will need to rerun the RELEASERUV task on this replica\n",
+				slapi_sdn_get_dn(dn) );
+		}
+		agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj);
+	}
+
+free_and_return:
+	/*
+	 *  Set the rid as "ALREADY_RELEASED, and remove the cleaned ruv
+	 */
+	if(rc == 0){
+		set_released_rid(ALREADY_RELEASED);
+		delete_cleaned_rid();
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "releaseruv_extop: released rid (%d) successfully\n", rid);
+	} else {
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "releaseruv_extop: failed to release rid(%d), error (%d), please retry the task\n",rid, rc);
+	}
+
+	if(mtnode_ext->replica)
+		object_release (mtnode_ext->replica);
+
+	return rc;
+}
+
+/*
  * This plugin entry point is a noop entry
  * point. It's used when registering extops that
  * are only used as responses. We'll never receive




More information about the 389-commits mailing list