From 86ed16008655da8f5b398a78cffd9a2b5e3176f8 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 24 Mar 2022 15:39:22 +0000 Subject: [PATCH] add backoff to checkpoint write (#13902) (#13931) This commit changes `queue.checkpoint.retry` to `true` by default allowing retry of checkpoint write failure. Add exponential backoff retry to checkpoint write to mitigate AccessDeniedExcpetion in Windows. Fixed: #12345 (cherry picked from commit 1a5030bd63b7220746471d927d7240d6cfcd38c8) Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> --- docs/static/settings-file.asciidoc | 4 +- logstash-core/lib/logstash/environment.rb | 2 +- .../ackedqueue/io/FileCheckpointIO.java | 16 +++-- .../org/logstash/util/CheckedSupplier.java | 6 ++ .../org/logstash/util/ExponentialBackoff.java | 65 +++++++++++++++++++ .../logstash/util/ExponentialBackoffTest.java | 39 +++++++++++ 6 files changed, 122 insertions(+), 10 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/util/CheckedSupplier.java create mode 100644 logstash-core/src/main/java/org/logstash/util/ExponentialBackoff.java create mode 100644 logstash-core/src/test/java/org/logstash/util/ExponentialBackoffTest.java diff --git a/docs/static/settings-file.asciidoc b/docs/static/settings-file.asciidoc index 9ee0ea89458..2e9cb077e19 100644 --- a/docs/static/settings-file.asciidoc +++ b/docs/static/settings-file.asciidoc @@ -219,8 +219,8 @@ Values other than `disabled` are currently considered BETA, and may produce unin | 1024 | `queue.checkpoint.retry` -| When enabled, Logstash will retry once per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. -| `false` +| When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. +| `true` | `queue.drain` | When enabled, Logstash waits until the persistent queue is drained before shutting down. diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 8fe861a7f09..747f31343c9 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -89,7 +89,7 @@ module Environment Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited Setting::Numeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing - Setting::Boolean.new("queue.checkpoint.retry", false), + Setting::Boolean.new("queue.checkpoint.retry", true), Setting::Boolean.new("dead_letter_queue.enable", false), Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"), Setting::Numeric.new("dead_letter_queue.flush_interval", 5000), diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java index 4e0f8866dc4..27239ad8057 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/FileCheckpointIO.java @@ -32,6 +32,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.ackedqueue.Checkpoint; +import org.logstash.util.ExponentialBackoff; /** @@ -70,6 +71,7 @@ public class FileCheckpointIO implements CheckpointIO { private static final String HEAD_CHECKPOINT = "checkpoint.head"; private static final String TAIL_CHECKPOINT = "checkpoint."; private final Path dirPath; + private final ExponentialBackoff backoff; public FileCheckpointIO(Path dirPath) { this(dirPath, false); @@ -78,6 +80,7 @@ public FileCheckpointIO(Path dirPath) { public FileCheckpointIO(Path dirPath, boolean retry) { this.dirPath = dirPath; this.retry = retry; + this.backoff = new ExponentialBackoff(3L); } @Override @@ -104,20 +107,19 @@ public void write(String fileName, Checkpoint checkpoint) throws IOException { out.getFD().sync(); } + // Windows can have problem doing file move See: https://github.com/elastic/logstash/issues/12345 + // retry a couple of times to make it works. The first two runs has no break. The rest of reties are exponential backoff. try { Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE); } catch (IOException ex) { if (retry) { try { - logger.error("Retrying after exception writing checkpoint: " + ex); - Thread.sleep(500); - Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE); - } catch (IOException | InterruptedException ex2) { - logger.error("Aborting after second exception writing checkpoint: " + ex2); - throw ex; + backoff.retryable(() -> Files.move(tmpPath, dirPath.resolve(fileName), StandardCopyOption.ATOMIC_MOVE)); + } catch (ExponentialBackoff.RetryException re) { + throw new IOException("Error writing checkpoint", re); } } else { - logger.error("Error writing checkpoint: " + ex); + logger.error("Error writing checkpoint without retry: " + ex); throw ex; } } diff --git a/logstash-core/src/main/java/org/logstash/util/CheckedSupplier.java b/logstash-core/src/main/java/org/logstash/util/CheckedSupplier.java new file mode 100644 index 00000000000..df81ffc5af4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/util/CheckedSupplier.java @@ -0,0 +1,6 @@ +package org.logstash.util; + +@FunctionalInterface +public interface CheckedSupplier { + T get() throws Exception; +} diff --git a/logstash-core/src/main/java/org/logstash/util/ExponentialBackoff.java b/logstash-core/src/main/java/org/logstash/util/ExponentialBackoff.java new file mode 100644 index 00000000000..8c96f7cfe0d --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/util/ExponentialBackoff.java @@ -0,0 +1,65 @@ +package org.logstash.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Random; + +public class ExponentialBackoff { + private final long maxRetry; + private static final int[] BACKOFF_SCHEDULE_MS = {100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 25_600, 51_200}; + private static final int BACKOFF_MAX_MS = 60_000; + + private static final Logger logger = LogManager.getLogger(ExponentialBackoff.class); + + public ExponentialBackoff(long maxRetry) { + this.maxRetry = maxRetry; + } + + public T retryable(CheckedSupplier action) throws RetryException { + long attempt = 0L; + + do { + try { + attempt++; + return action.get(); + } catch (Exception ex) { + logger.error("Backoff retry exception", ex); + } + + if (hasRetry(attempt)) { + try { + int ms = backoffTime(attempt); + logger.info("Retry({}) will execute in {} second", attempt, ms/1000.0); + Thread.sleep(ms); + } catch (InterruptedException e) { + throw new RetryException("Backoff retry aborted", e); + } + } + } while (hasRetry(attempt)); + + throw new RetryException("Reach max retry"); + } + + private int backoffTime(Long attempt) { + return (attempt - 1 < BACKOFF_SCHEDULE_MS.length)? + BACKOFF_SCHEDULE_MS[attempt.intValue() - 1] + new Random().nextInt(1000) : + BACKOFF_MAX_MS; + } + + private boolean hasRetry(long attempt) { + return attempt <= maxRetry; + } + + public static class RetryException extends Exception { + private static final long serialVersionUID = 1L; + + public RetryException(String message) { + super(message); + } + + public RetryException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/logstash-core/src/test/java/org/logstash/util/ExponentialBackoffTest.java b/logstash-core/src/test/java/org/logstash/util/ExponentialBackoffTest.java new file mode 100644 index 00000000000..4663e4b0c27 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/util/ExponentialBackoffTest.java @@ -0,0 +1,39 @@ +package org.logstash.util; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +public class ExponentialBackoffTest { + @Test + public void testWithoutException() throws Exception { + ExponentialBackoff backoff = new ExponentialBackoff(1L); + CheckedSupplier supplier = () -> 1 + 1; + Assertions.assertThatCode(() -> backoff.retryable(supplier)).doesNotThrowAnyException(); + Assertions.assertThat(backoff.retryable(supplier)).isEqualTo(2); + } + + @Test + @SuppressWarnings("unchecked") + public void testOneException() throws Exception { + ExponentialBackoff backoff = new ExponentialBackoff(1L); + CheckedSupplier supplier = Mockito.mock(CheckedSupplier.class); + Mockito.when(supplier.get()).thenThrow(new IOException("can't write to disk")).thenReturn(true); + Boolean b = backoff.retryable(supplier); + Assertions.assertThat(b).isEqualTo(true); + } + + @Test + @SuppressWarnings("unchecked") + public void testExceptionsReachMaxRetry() throws Exception { + ExponentialBackoff backoff = new ExponentialBackoff(2L); + CheckedSupplier supplier = Mockito.mock(CheckedSupplier.class); + Mockito.when(supplier.get()).thenThrow(new IOException("can't write to disk")); + Assertions.assertThatThrownBy(() -> backoff.retryable(supplier)) + .isInstanceOf(ExponentialBackoff.RetryException.class) + .hasMessageContaining("max retry"); + } + +} \ No newline at end of file