Skip to content

Commit

Permalink
add backoff to checkpoint write (elastic#13902)
Browse files Browse the repository at this point in the history
This commit changes `queue.checkpoint.retry` to `true` by default allowing retry of checkpoint write failure.
Add exponential backoff retry to checkpoint write to mitigate AccessDeniedExcpetion in Windows.
Fixed: elastic#12345
  • Loading branch information
kaisecheng committed Mar 24, 2022
1 parent 94a7aa3 commit 74114ac
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 10 deletions.
4 changes: 2 additions & 2 deletions docs/static/settings-file.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ Values other than `disabled` are currently considered BETA, and may produce unin
| 1024

| `queue.checkpoint.retry`
| When enabled, Logstash will retry once per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances.
| `false`
| When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances.
| `true`

| `queue.drain`
| When enabled, Logstash waits until the persistent queue is drained before shutting down.
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ module Environment
Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::Boolean.new("queue.checkpoint.retry", false),
Setting::Boolean.new("queue.checkpoint.retry", true),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::Numeric.new("dead_letter_queue.flush_interval", 5000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ackedqueue.Checkpoint;
import org.logstash.util.ExponentialBackoff;


/**
Expand Down Expand Up @@ -70,6 +71,7 @@ public class FileCheckpointIO implements CheckpointIO {
private static final String HEAD_CHECKPOINT = "checkpoint.head";
private static final String TAIL_CHECKPOINT = "checkpoint.";
private final Path dirPath;
private final ExponentialBackoff backoff;

public FileCheckpointIO(Path dirPath) {
this(dirPath, false);
Expand All @@ -78,6 +80,7 @@ public FileCheckpointIO(Path dirPath) {
public FileCheckpointIO(Path dirPath, boolean retry) {
this.dirPath = dirPath;
this.retry = retry;
this.backoff = new ExponentialBackoff(3L);
}

@Override
Expand All @@ -104,20 +107,19 @@ public void write(String fileName, Checkpoint checkpoint) throws IOException {
out.getFD().sync();
}

// Windows can have problem doing file move See: https://github.com/elastic/logstash/issues/12345
// retry a couple of times to make it works. The first two runs has no break. The rest of reties are exponential backoff.
try {
Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException ex) {
if (retry) {
try {
logger.error("Retrying after exception writing checkpoint: " + ex);
Thread.sleep(500);
Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException | InterruptedException ex2) {
logger.error("Aborting after second exception writing checkpoint: " + ex2);
throw ex;
backoff.retryable(() -> Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE));
} catch (ExponentialBackoff.RetryException re) {
throw new IOException("Error writing checkpoint", re);
}
} else {
logger.error("Error writing checkpoint: " + ex);
logger.error("Error writing checkpoint without retry: " + ex);
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.logstash.util;

@FunctionalInterface
public interface CheckedSupplier<T> {
T get() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.logstash.util;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Random;

public class ExponentialBackoff {
private final long maxRetry;
private static final int[] BACKOFF_SCHEDULE_MS = {100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200};
private static final int BACKOFF_MAX_MS = 60_000;

private static final Logger logger = LogManager.getLogger(ExponentialBackoff.class);

public ExponentialBackoff(long maxRetry) {
this.maxRetry = maxRetry;
}

public <T> T retryable(CheckedSupplier<T> action) throws RetryException {
long attempt = 0L;

do {
try {
attempt++;
return action.get();
} catch (Exception ex) {
logger.error("Backoff retry exception", ex);
}

if (hasRetry(attempt)) {
try {
int ms = backoffTime(attempt);
logger.info("Retry({}) will execute in {} second", attempt, ms/1000.0);
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new RetryException("Backoff retry aborted", e);
}
}
} while (hasRetry(attempt));

throw new RetryException("Reach max retry");
}

private int backoffTime(Long attempt) {
return (attempt - 1 < BACKOFF_SCHEDULE_MS.length)?
BACKOFF_SCHEDULE_MS[attempt.intValue() - 1] + new Random().nextInt(1000) :
BACKOFF_MAX_MS;
}

private boolean hasRetry(long attempt) {
return attempt <= maxRetry;
}

public static class RetryException extends Exception {
private static final long serialVersionUID = 1L;

public RetryException(String message) {
super(message);
}

public RetryException(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.logstash.util;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;

public class ExponentialBackoffTest {
@Test
public void testWithoutException() throws Exception {
ExponentialBackoff backoff = new ExponentialBackoff(1L);
CheckedSupplier<Integer> supplier = () -> 1 + 1;
Assertions.assertThatCode(() -> backoff.retryable(supplier)).doesNotThrowAnyException();
Assertions.assertThat(backoff.retryable(supplier)).isEqualTo(2);
}

@Test
@SuppressWarnings("unchecked")
public void testOneException() throws Exception {
ExponentialBackoff backoff = new ExponentialBackoff(1L);
CheckedSupplier<Boolean> supplier = Mockito.mock(CheckedSupplier.class);
Mockito.when(supplier.get()).thenThrow(new IOException("can't write to disk")).thenReturn(true);
Boolean b = backoff.retryable(supplier);
Assertions.assertThat(b).isEqualTo(true);
}

@Test
@SuppressWarnings("unchecked")
public void testExceptionsReachMaxRetry() throws Exception {
ExponentialBackoff backoff = new ExponentialBackoff(2L);
CheckedSupplier<Boolean> supplier = Mockito.mock(CheckedSupplier.class);
Mockito.when(supplier.get()).thenThrow(new IOException("can't write to disk"));
Assertions.assertThatThrownBy(() -> backoff.retryable(supplier))
.isInstanceOf(ExponentialBackoff.RetryException.class)
.hasMessageContaining("max retry");
}

}

0 comments on commit 74114ac

Please sign in to comment.