forked from elastic/logstash
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add backoff to checkpoint write (elastic#13902)
This commit changes `queue.checkpoint.retry` to `true` by default allowing retry of checkpoint write failure. Add exponential backoff retry to checkpoint write to mitigate AccessDeniedExcpetion in Windows. Fixed: elastic#12345
- Loading branch information
1 parent
7df02cc
commit 1f2ed45
Showing
6 changed files
with
122 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
6 changes: 6 additions & 0 deletions
6
logstash-core/src/main/java/org/logstash/util/CheckedSupplier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package org.logstash.util; | ||
|
||
@FunctionalInterface | ||
public interface CheckedSupplier<T> { | ||
T get() throws Exception; | ||
} |
65 changes: 65 additions & 0 deletions
65
logstash-core/src/main/java/org/logstash/util/ExponentialBackoff.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package org.logstash.util; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import java.util.Random; | ||
|
||
public class ExponentialBackoff { | ||
private final long maxRetry; | ||
private static final int[] BACKOFF_SCHEDULE_MS = {100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200}; | ||
private static final int BACKOFF_MAX_MS = 60_000; | ||
|
||
private static final Logger logger = LogManager.getLogger(ExponentialBackoff.class); | ||
|
||
public ExponentialBackoff(long maxRetry) { | ||
this.maxRetry = maxRetry; | ||
} | ||
|
||
public <T> T retryable(CheckedSupplier<T> action) throws RetryException { | ||
long attempt = 0L; | ||
|
||
do { | ||
try { | ||
attempt++; | ||
return action.get(); | ||
} catch (Exception ex) { | ||
logger.error("Backoff retry exception", ex); | ||
} | ||
|
||
if (hasRetry(attempt)) { | ||
try { | ||
int ms = backoffTime(attempt); | ||
logger.info("Retry({}) will execute in {} second", attempt, ms/1000.0); | ||
Thread.sleep(ms); | ||
} catch (InterruptedException e) { | ||
throw new RetryException("Backoff retry aborted", e); | ||
} | ||
} | ||
} while (hasRetry(attempt)); | ||
|
||
throw new RetryException("Reach max retry"); | ||
} | ||
|
||
private int backoffTime(Long attempt) { | ||
return (attempt - 1 < BACKOFF_SCHEDULE_MS.length)? | ||
BACKOFF_SCHEDULE_MS[attempt.intValue() - 1] + new Random().nextInt(1000) : | ||
BACKOFF_MAX_MS; | ||
} | ||
|
||
private boolean hasRetry(long attempt) { | ||
return attempt <= maxRetry; | ||
} | ||
|
||
public static class RetryException extends Exception { | ||
private static final long serialVersionUID = 1L; | ||
|
||
public RetryException(String message) { | ||
super(message); | ||
} | ||
|
||
public RetryException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
logstash-core/src/test/java/org/logstash/util/ExponentialBackoffTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package org.logstash.util; | ||
|
||
import org.assertj.core.api.Assertions; | ||
import org.junit.Test; | ||
import org.mockito.Mockito; | ||
|
||
import java.io.IOException; | ||
|
||
public class ExponentialBackoffTest { | ||
@Test | ||
public void testWithoutException() throws Exception { | ||
ExponentialBackoff backoff = new ExponentialBackoff(1L); | ||
CheckedSupplier<Integer> supplier = () -> 1 + 1; | ||
Assertions.assertThatCode(() -> backoff.retryable(supplier)).doesNotThrowAnyException(); | ||
Assertions.assertThat(backoff.retryable(supplier)).isEqualTo(2); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
public void testOneException() throws Exception { | ||
ExponentialBackoff backoff = new ExponentialBackoff(1L); | ||
CheckedSupplier<Boolean> supplier = Mockito.mock(CheckedSupplier.class); | ||
Mockito.when(supplier.get()).thenThrow(new IOException("can't write to disk")).thenReturn(true); | ||
Boolean b = backoff.retryable(supplier); | ||
Assertions.assertThat(b).isEqualTo(true); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
public void testExceptionsReachMaxRetry() throws Exception { | ||
ExponentialBackoff backoff = new ExponentialBackoff(2L); | ||
CheckedSupplier<Boolean> supplier = Mockito.mock(CheckedSupplier.class); | ||
Mockito.when(supplier.get()).thenThrow(new IOException("can't write to disk")); | ||
Assertions.assertThatThrownBy(() -> backoff.retryable(supplier)) | ||
.isInstanceOf(ExponentialBackoff.RetryException.class) | ||
.hasMessageContaining("max retry"); | ||
} | ||
|
||
} |