modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java | 92 +++++-- modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java | 120 ++++++---- 2 files changed, 154 insertions(+), 58 deletions(-)
New commits: commit 845261576ac892a029eac88b66ad7fd44d37d4f5 Author: Jay Shaughnessy jshaughn@redhat.com Date: Tue Dec 3 11:36:48 2013 -0500
Changes to AvailabilityProxy - support test code tweaking the various configurable values. - save 3 bytes per proxy by making the sync timeout limit a byte type - add some commented out dev logging, to be removed later as desired Work on AvailabilityProxyTest - add testing for the new async timeout stuff - add testing for the sync disable/enable
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java index 1bf380f..49a7452 100644 --- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java +++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java @@ -62,9 +62,9 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili * Number of consecutive avail sync timeouts before we assume the resource's avail checking can not meet the async * timeout. At that point stop slowing things down waiting for the timeout and instead, for this resource, * rely only on the async results. In other words, stop trying to report live avail if live avail checking is - * consistently too slow. + * consistently too slow. Max = 127. We use a byte here to save space. */ - private static final int AVAIL_SYNC_TIMEOUT_LIMIT; + private static final byte AVAIL_SYNC_TIMEOUT_LIMIT;
/** * How long to wait for an *async* future to return a resource availability (in ms). @@ -86,14 +86,17 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili } AVAIL_SYNC_TIMEOUT = syncAvailTimeout;
- int syncAvailTimeoutLimit; + byte syncAvailTimeoutLimit; try { // unlikely to be changed but back-door configurable - syncAvailTimeoutLimit = Integer.parseInt(System.getProperty( + syncAvailTimeoutLimit = Byte.parseByte(System.getProperty( "rhq.agent.plugins.availability-scan.sync-timeout-limit", "5")); } catch (Throwable t) { syncAvailTimeoutLimit = 5; } + if (syncAvailTimeoutLimit > 127) { + syncAvailTimeoutLimit = 127; + } AVAIL_SYNC_TIMEOUT_LIMIT = syncAvailTimeoutLimit;
int asyncAvailTimeout; @@ -124,7 +127,7 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili * returned synchronously (within the timeout period). There is currently no way to 'reset' this (short * of agent restart) after it has triggered, meaning the resource will no longer try to report live avail. */ - private int availAsyncConsecutiveTimeouts = 0; + private byte availSyncConsecutiveTimeouts = 0;
private final ClassLoader classLoader;
@@ -155,10 +158,11 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili * being computed. * * @throws TimeoutException - * if timeout occurred during the second call to this method + * if an async check exceeds AVAIL_ASYNC_TIMEOUT */ @Override public AvailabilityType getAvailability() { + // TODO take out DevDebug printlns when we're confident we don't need them AvailabilityType avail = UNKNOWN;
try { @@ -168,27 +172,33 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili if (availabilityFuture.isDone()) { // hold onto and report the last known value if necessary avail = availabilityFuture.get(); + // System.out.println("DevDebug 1 [" + System.currentTimeMillis() + "] future done avail [" + avail.name() + "]");
} else { // We are still waiting on the previously submitted async avail check - let's just return // the last one we got. Note that if the future is not done after a large amount of time, // then it means this thread could somehow be hung or otherwise stuck and not returning. Not good. // In this case, throw a detailed exception to the avail checker. - if ((System.currentTimeMillis() - lastSubmitTime) > AVAIL_ASYNC_TIMEOUT) { + long elapsedTime = System.currentTimeMillis() - lastSubmitTime; + if (elapsedTime > getAsyncTimeout()) { + // System.out.println("DevDebug 2 [" + System.currentTimeMillis() + "] async timeout"); + Throwable t = new Throwable(); if (current != null) { t.setStackTrace(current.getStackTrace()); } - String msg = "Availability check running too long, canceled for [" + resourceComponent - + "]; Stack trace includes the timed out thread's stack trace."; + String msg = "Availability check ran too long [" + elapsedTime + "ms], canceled for [" + + resourceComponent + "]; Stack trace includes the timed out thread's stack trace."; availabilityFuture.cancel(true);
// try again, maybe the situation will resolve in time for the next check availabilityFuture = executor.submit(this); lastSubmitTime = System.currentTimeMillis(); + // System.out.println("DevDebug 3 [" + System.currentTimeMillis() + "] async timeout submit");
throw new TimeoutException(msg, t); } else { + // System.out.println("DevDebug 4 [" + System.currentTimeMillis() + "] no async timeout, return lastAvail [" + lastAvail.name() + "]"); return lastAvail; } } @@ -197,26 +207,32 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili // request a thread to do an avail check availabilityFuture = executor.submit(this); lastSubmitTime = System.currentTimeMillis(); + // System.out.println("DevDebug 5 [" + System.currentTimeMillis() + "] standard submit");
// if we have exceeded the timeout too many times in a row assume that this is a slow // resource and stop performing synchronous checks, which would likely fail to return fast enough anyway. - if (availAsyncConsecutiveTimeouts < AVAIL_SYNC_TIMEOUT_LIMIT) { + if (availSyncConsecutiveTimeouts < getSyncTimeoutLimit()) { // attempt to get availability synchronously - avail = availabilityFuture.get(AVAIL_SYNC_TIMEOUT, TimeUnit.MILLISECONDS); + avail = availabilityFuture.get(getSyncTimeout(), TimeUnit.MILLISECONDS); + // System.out.println("DevDebug 6 [" + System.currentTimeMillis() + "] sync avail [" + avail.name() + "]");
// success (failure will throw exception) - availAsyncConsecutiveTimeouts = 0; + availSyncConsecutiveTimeouts = 0; availabilityFuture = null;
- } else if (availAsyncConsecutiveTimeouts == AVAIL_SYNC_TIMEOUT_LIMIT) { + } else if (availSyncConsecutiveTimeouts == getSyncTimeoutLimit()) { + // System.out.println("DevDebug 7 [" + System.currentTimeMillis() + "] sync disabled"); + // log one time that we are disabling synchronous checks for this resource - ++availAsyncConsecutiveTimeouts; + ++availSyncConsecutiveTimeouts; if (LOG.isDebugEnabled()) { LOG.debug("Disabling synchronous availability collection for [" + resourceComponent + "]; [" - + AVAIL_SYNC_TIMEOUT_LIMIT + "] consective timeouts exceeding [" + AVAIL_SYNC_TIMEOUT + "ms]"); + + getSyncTimeoutLimit() + "] consecutive timeouts exceeding [" + getSyncTimeout() + "ms]"); } } } catch (InterruptedException e) { + // System.out.println("DevDebug 8 [" + System.currentTimeMillis() + "] Interrupted"); + LOG.debug("InterruptedException; shut down is (likely) in progress."); availabilityFuture.cancel(true); availabilityFuture = null; @@ -227,8 +243,10 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili throw new RuntimeException("Availability check failed", e.getCause());
} catch (java.util.concurrent.TimeoutException e) { + // System.out.println("DevDebug 9 [" + System.currentTimeMillis() + "] Sync Timeout"); + // failed to get avail synchronously. next call to the future will return availability (we hope) - ++availAsyncConsecutiveTimeouts; + ++availSyncConsecutiveTimeouts; }
return processAvail(avail); @@ -253,29 +271,61 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili // resource comes up we should give it a chance to respond quickly and provide live avail. if (result != lastAvail) { if (result == UP) { - if (availAsyncConsecutiveTimeouts >= AVAIL_SYNC_TIMEOUT_LIMIT) { + if (availSyncConsecutiveTimeouts >= getSyncTimeoutLimit()) { + // System.out.println("DevDebug 10 [" + System.currentTimeMillis() + "] Enabling Sync"); + if (LOG.isDebugEnabled()) { LOG.debug("Enabling synchronous availability collection for [" + resourceComponent + "]; Availability has just changed from [" + lastAvail + "] to UP."); } } - availAsyncConsecutiveTimeouts = 0; + availSyncConsecutiveTimeouts = 0;
} lastAvail = result; }
+ // System.out.println("DevDebug 11 [" + System.currentTimeMillis() + "] returning processAvail [" + result.getName()+ "]"); + return result; }
/** + * Override point. Typically for testing. + * @return something other than the env var setting. + */ + protected long getAsyncTimeout() { + return AVAIL_ASYNC_TIMEOUT; + } + + /** + * Override point. Typically for testing. + * @return something other than the env var setting. + */ + protected long getSyncTimeout() { + return AVAIL_SYNC_TIMEOUT; + } + + /** + * Override point. Typically for testing. + * @return something other than the env var setting. + */ + protected byte getSyncTimeoutLimit() { + return AVAIL_SYNC_TIMEOUT_LIMIT; + } + + protected boolean isSyncDisabled() { + return availSyncConsecutiveTimeouts >= getSyncTimeoutLimit(); + } + + /** * Debug string. */ @Override public String toString() { return "AvailabilityProxy [resourceComponent=" + resourceComponent + ", lastAvail=" + lastAvail - + ", lastSubmitTime=" + new java.util.Date(lastSubmitTime) + ", executor=" - + executor + ", availabilityFuture=" + availabilityFuture + ", current=" + current + ", timeouts=" - + availAsyncConsecutiveTimeouts + "]"; + + ", lastSubmitTime=" + new java.util.Date(lastSubmitTime) + ", executor=" + executor + + ", availabilityFuture=" + availabilityFuture + ", current=" + current + ", timeouts=" + + availSyncConsecutiveTimeouts + "]"; } } diff --git a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java index e8dd78d..e861845 100644 --- a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java +++ b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java @@ -19,7 +19,6 @@ package org.rhq.core.pc.inventory;
import static org.rhq.core.domain.measurement.AvailabilityType.DOWN; -import static org.rhq.core.domain.measurement.AvailabilityType.UNKNOWN; import static org.rhq.core.domain.measurement.AvailabilityType.UP; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.fail; @@ -42,57 +41,70 @@ public class AvailabilityProxyTest implements AvailabilityFacet {
private final Log LOG = LogFactory.getLog(AvailabilityProxyTest.class); private volatile int timeout = 1; - private final AvailabilityType avail = UP; + private AvailabilityType returnedAvail = UP; private final ExecutorService executor = Executors.newCachedThreadPool(); - private volatile boolean interrupted;
/** * Run a test. Note this may not be 100% reliable, as it depends on thread execution to * happen according to our sleep schedule... */ public void test() throws InterruptedException { - AvailabilityProxy ap = new AvailabilityProxy(this, executor, getClass().getClassLoader()); + TestAvailabilityProxy ap = new TestAvailabilityProxy(this, executor, getClass().getClassLoader()); LOG.debug("proxy " + ap); - assertEquals("should be up", avail, ap.getAvailability()); - assertEquals(false, interrupted); - timeout = 1100; - assertEquals("should be down", DOWN, ap.getAvailability()); - Thread.sleep(500); // waited 1.5 sec - assertEquals("should be up now", UP, ap.getAvailability()); - Thread.sleep(1); // waited 1.001 seconds + + assertEquals("should be up", UP, ap.getAvailability()); // waits 1ms and returns synchronously + timeout = 1200; + assertEquals("should be down", DOWN, ap.getAvailability()); // waits 1s and times out + Thread.sleep(300); // now waited total of 1s + .3s = 1.3 sec > 1.2s + assertEquals("should be up now", UP, ap.getAvailability()); // waits 1s and returns last reported value (UP) + + ap.setAsyncTimeout(1020L); + Thread.sleep(50); // waited 1.050 seconds try { - ap.getAvailability(); - fail("should timeout 1"); + ap.getAvailability(); // this submits another which we need to let finish + fail("should timeout 1020, waited 1050"); } catch (TimeoutException e) { } - LOG.debug("proxy " + ap); - assertEquals(true, interrupted); - - LOG.debug("force more timeouts"); - for (int i = 0; i < 5; ++i) { - LOG.debug("timeout " + i); - try { - ap.getAvailability(); - fail("should timeout"); - } catch (TimeoutException e) { - } - } + // wait for the last submit to return + Thread.sleep(1210);
LOG.debug("proxy " + ap);
- timeout = 0; - try { + // try disabling sync checks + // - start returning DOWN avail in order to perform a sync disable and then re-enable + // - go back to default async timeout, we don't want it to trigger anymore + // short timeout but longer than the sync timeout to force several sync timeouts + returnedAvail = DOWN; + ap.setAsyncTimeout(null); + timeout = 75; + ap.setSyncTimeout(50L); + + while (!ap.isSyncDisabled()) { ap.getAvailability(); - fail("should be down, until we wait a little"); - } catch (TimeoutException e) { + Thread.sleep(50L); } - Thread.sleep(100); - assertEquals("should be up now", UP, ap.getAvailability());
+ // go back to returning UP so we can re-enable sync checking + // make the sync check a half second so we can prove that sync checking is not happening + returnedAvail = UP; + timeout = 500; + ap.setSyncTimeout(500L); + long start = System.currentTimeMillis(); + assertEquals("should be DOWN", DOWN, ap.getAvailability()); + assert System.currentTimeMillis() - start < 100 : "Should have been fast, returning old avail"; + // wait for the last submit to return + Thread.sleep(510); + + // check for re-enable sync checks + assertEquals("should be UP", UP, ap.getAvailability()); + assertEquals("should be enabled", false, ap.isSyncDisabled()); + // wait for the last submit to return + Thread.sleep(510); + + // test interrupt handling LOG.debug("interrupt this thread"); Thread.currentThread().interrupt(); - timeout = 1000 * 5; - assertEquals("cancelation", UNKNOWN, ap.getAvailability()); + assertEquals("cancellation", AvailabilityType.UNKNOWN, ap.getAvailability()); assertEquals(true, Thread.currentThread().isInterrupted()); }
@@ -102,10 +114,44 @@ public class AvailabilityProxyTest implements AvailabilityFacet { LOG.debug("sleep " + timeout); Thread.sleep(timeout); } catch (InterruptedException e) { - interrupted = true; Thread.currentThread().interrupt(); } - LOG.debug("return " + avail); - return avail; + LOG.debug("return " + returnedAvail.getName()); + return returnedAvail; + } + + private class TestAvailabilityProxy extends AvailabilityProxy { + + private Long asyncTimeout = null; + private Long syncTimeout = null; + + public TestAvailabilityProxy(AvailabilityFacet resourceComponent, ExecutorService executor, + ClassLoader classLoader) { + super(resourceComponent, executor, classLoader); + } + + @Override + public AvailabilityType getAvailability() { + // System.out.println("DevDebug 0 [" + System.currentTimeMillis() + "] getAvail() timeout=[" + timeout + "], syncTimeout=[" + syncTimeout + "], asyncTimeout=[" + asyncTimeout + "]"); + return super.getAvailability(); + } + + public void setAsyncTimeout(Long asyncTimeout) { + this.asyncTimeout = asyncTimeout; + } + + public void setSyncTimeout(Long syncTimeout) { + this.syncTimeout = syncTimeout; + } + + @Override + protected long getSyncTimeout() { + return null == syncTimeout ? super.getSyncTimeout() : syncTimeout; + } + + @Override + protected long getAsyncTimeout() { + return null == asyncTimeout ? super.getAsyncTimeout() : asyncTimeout; + } } -} \ No newline at end of file +}
rhq-commits@lists.fedorahosted.org