From 5c6b2bf8ecb89efb5cbb70901d4adb0ef459d973 Mon Sep 17 00:00:00 2001 From: sndyuk Date: Sat, 23 Feb 2019 00:37:34 +0900 Subject: [PATCH] Added CloudWatch appender --- pom.xml | 7 + .../logback/more/appenders/AwsAppender.java | 51 +++++ .../appenders/CloudWatchLogbackAppender.java | 215 ++++++++++++++++++ .../more/appenders/CountBasedStreamName.java | 36 +++ .../more/appenders/DaemonAppender.java | 97 -------- .../appenders/DynamoDBLogbackAppender.java | 55 +---- .../more/appenders/IntervalEmitter.java | 56 +++++ .../more/appenders/LogbackAppenderTest.java | 22 +- src/test/resources/logback-appenders.xml | 57 ++++- src/test/resources/logback.xml | 1 + 10 files changed, 449 insertions(+), 148 deletions(-) create mode 100644 src/main/java/ch/qos/logback/more/appenders/AwsAppender.java create mode 100644 src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java create mode 100644 src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java delete mode 100644 src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java create mode 100644 src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java diff --git a/pom.xml b/pom.xml index b2ca795..5a8823f 100644 --- a/pom.xml +++ b/pom.xml @@ -127,6 +127,13 @@ true + + com.amazonaws + aws-java-sdk-logs + ${aws.version} + true + + junit junit diff --git a/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java b/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java new file mode 100644 index 0000000..7f2c48d --- /dev/null +++ b/src/main/java/ch/qos/logback/more/appenders/AwsAppender.java @@ -0,0 +1,51 @@ +package ch.qos.logback.more.appenders; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.PropertiesCredentials; +import ch.qos.logback.core.UnsynchronizedAppenderBase; + +public abstract class AwsAppender extends UnsynchronizedAppenderBase { + + protected AwsConfig config; + protected AWSCredentials credentials; + + @Override + public void start() { + try { + super.start(); + if (config.getCredentialFilePath() != null + && config.getCredentialFilePath().length() > 0) { + this.credentials = new PropertiesCredentials(getClass().getClassLoader() + .getResourceAsStream(config.getCredentialFilePath())); + } else { + this.credentials = + DefaultAWSCredentialsProviderChain.getInstance().getCredentials(); + } + } catch (Exception e) { + addWarn("Could not initialize " + AwsAppender.class.getCanonicalName() + + " ( will try to initialize again later ): " + e); + } + } + + public static class AwsConfig { + private String credentialFilePath; + private String region; + + public void setCredentialFilePath(String credentialFilePath) { + this.credentialFilePath = credentialFilePath; + } + + public String getCredentialFilePath() { + return credentialFilePath; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + } +} diff --git a/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java new file mode 100644 index 0000000..eaadf8a --- /dev/null +++ b/src/main/java/ch/qos/logback/more/appenders/CloudWatchLogbackAppender.java @@ -0,0 +1,215 @@ +package ch.qos.logback.more.appenders; + +import java.util.List; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.services.logs.AWSLogs; +import com.amazonaws.services.logs.AWSLogsClientBuilder; +import com.amazonaws.services.logs.model.CreateLogGroupRequest; +import com.amazonaws.services.logs.model.CreateLogStreamRequest; +import com.amazonaws.services.logs.model.DescribeLogGroupsRequest; +import com.amazonaws.services.logs.model.DescribeLogGroupsResult; +import com.amazonaws.services.logs.model.DescribeLogStreamsRequest; +import com.amazonaws.services.logs.model.DescribeLogStreamsResult; +import com.amazonaws.services.logs.model.InputLogEvent; +import com.amazonaws.services.logs.model.LogGroup; +import com.amazonaws.services.logs.model.LogStream; +import com.amazonaws.services.logs.model.PutLogEventsRequest; +import com.amazonaws.services.logs.model.PutLogEventsResult; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.encoder.EchoEncoder; +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.more.appenders.IntervalEmitter.EventMapper; +import ch.qos.logback.more.appenders.IntervalEmitter.IntervalAppender; + +public class CloudWatchLogbackAppender extends AwsAppender { + + private IntervalEmitter emitter; + private AWSLogs awsLogs; + private String logGroupName; + private StreamName logStreamName; + private boolean createLogDestination; + private long emitInterval = 10000; + private Encoder encoder = new EchoEncoder(); + + public void setAwsConfig(AwsConfig config) { + this.config = config; + } + + public void setLogGroupName(String logGroupName) { + this.logGroupName = logGroupName; + } + + public void setLogStreamName(String logStreamName) { + this.logStreamName = new StaticStreamName(logStreamName); + } + + public void setLogStreamRolling(StreamName streamName) { + this.logStreamName = streamName; + } + + public void setCreateLogDestination(boolean createLogDestination) { + this.createLogDestination = createLogDestination; + } + + public void setEmitInterval(long emitInterval) { + this.emitInterval = emitInterval; + } + + public void setEncoder(Encoder encoder) { + this.encoder = encoder; + } + + @Override + public void start() { + super.start(); + if (logGroupName == null || logGroupName.length() == 0 || logStreamName == null) { + throw new IllegalArgumentException("logGroupName and logStreamName must be defined."); + } + this.awsLogs = AWSLogsClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(config.getRegion()).build(); + this.emitter = new IntervalEmitter(emitInterval, + new CloudWatchEventMapper(), new CloudWatchIntervalAppender()); + } + + @Override + public void stop() { + try { + emitter.emit(); + } catch (Exception e) { + // Ignore + } + try { + super.stop(); + } finally { + try { + awsLogs.shutdown(); + } catch (Exception e) { + // pass + } + } + } + + @Override + protected void append(E eventObject) { + emitter.append(eventObject); + } + + private void ensureLogGroup() { + DescribeLogGroupsRequest request = + new DescribeLogGroupsRequest().withLogGroupNamePrefix(logGroupName); + DescribeLogGroupsResult result = awsLogs.describeLogGroups(request); + for (LogGroup group : result.getLogGroups()) { + if (logGroupName.equals(group.getLogGroupName())) { + return; + } + } + if (createLogDestination) { + awsLogs.createLogGroup(new CreateLogGroupRequest(logGroupName)); + } else { + throw new IllegalStateException( + "The specified log group does not exist: " + logGroupName); + } + } + + private String ensureLogStream(String name) { + DescribeLogStreamsRequest request = new DescribeLogStreamsRequest() + .withLogGroupName(logGroupName).withLogStreamNamePrefix(name); + DescribeLogStreamsResult result = awsLogs.describeLogStreams(request); + for (LogStream stream : result.getLogStreams()) { + if (name.equals(stream.getLogStreamName())) { + return stream.getUploadSequenceToken(); + } + } + if (createLogDestination) { + awsLogs.createLogStream(new CreateLogStreamRequest(logGroupName, name)); + return null; + } else { + throw new IllegalStateException( + "The specified log stream does not exist: " + logStreamName); + } + } + + private final class CloudWatchEventMapper implements EventMapper { + + @Override + public InputLogEvent map(E event) { + InputLogEvent logEvent = new InputLogEvent(); + if (event instanceof ILoggingEvent) { + ILoggingEvent loggingEvent = (ILoggingEvent) event; + logEvent.setTimestamp(loggingEvent.getTimeStamp()); + } else { + logEvent.setTimestamp(System.currentTimeMillis()); + } + logEvent.setMessage(new String(encoder.encode(event))); + return logEvent; + } + } + + private final class CloudWatchIntervalAppender implements IntervalAppender { + private String sequenceToken; + private boolean initialized = false; + private boolean switchingStream = false; + private String currentStreamName; + + @Override + public boolean append(List events) { + if (!initialized) { + synchronized (this) { + if (!initialized) { + ensureLogGroup(); + initialized = true; + } + } + return false; + } + if (switchingStream) { + return false; + } + String streamName = logStreamName.get(events); + if (!streamName.equals(currentStreamName)) { + switchingStream = true; + synchronized (this) { + if (switchingStream) { + sequenceToken = ensureLogStream(streamName); + currentStreamName = streamName; + switchingStream = false; + } else { + return false; + } + } + } + try { + PutLogEventsRequest request = + new PutLogEventsRequest(logGroupName, streamName, events); + if (sequenceToken != null) { + request.withSequenceToken(sequenceToken); + } + PutLogEventsResult result = awsLogs.putLogEvents(request); + sequenceToken = result.getNextSequenceToken(); + return true; + } catch (RuntimeException e) { + sequenceToken = null; + e.printStackTrace(); + throw e; + } + } + } + + protected interface StreamName { + String get(List events); + } + + public static class StaticStreamName implements StreamName { + private String name; + + public StaticStreamName(String name) { + this.name = name; + } + + @Override + public String get(List events) { + return name; + } + } +} diff --git a/src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java b/src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java new file mode 100644 index 0000000..0ca72b4 --- /dev/null +++ b/src/main/java/ch/qos/logback/more/appenders/CountBasedStreamName.java @@ -0,0 +1,36 @@ +package ch.qos.logback.more.appenders; + +import java.util.List; +import java.util.UUID; +import com.amazonaws.services.logs.model.InputLogEvent; +import ch.qos.logback.more.appenders.CloudWatchLogbackAppender.StreamName; + +public class CountBasedStreamName implements StreamName { + private long count = 0; + private long limit = 1000; + private String baseName = ""; + private String currentName; + + public void setBaseName(String baseName) { + this.baseName = baseName; + } + + public void setLimit(long limit) { + this.limit = limit; + this.count = limit + 1; + } + + @Override + public String get(List events) { + count += events.size(); + if (count > limit) { + synchronized (this) { + if (count > limit) { + currentName = baseName + UUID.randomUUID(); + count = events.size(); + } + } + } + return currentName; + } +} diff --git a/src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java b/src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java deleted file mode 100644 index 310e794..0000000 --- a/src/main/java/ch/qos/logback/more/appenders/DaemonAppender.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright (c) 2012 sndyuk - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ch.qos.logback.more.appenders; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Use UnsynchronizedAppenderBase instead. - */ -@Deprecated -public abstract class DaemonAppender implements Runnable { - private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool(); - - private static final Logger LOG = LoggerFactory.getLogger(DaemonAppender.class); - - private AtomicBoolean start = new AtomicBoolean(false); - private final BlockingQueue queue; - - DaemonAppender(int maxQueueSize) { - this.queue = new LinkedBlockingQueue(maxQueueSize); - } - - protected void execute() { - if (THREAD_POOL.isShutdown()) { - THREAD_POOL = Executors.newCachedThreadPool(); - } - THREAD_POOL.execute(this); - } - - void log(E eventObject) { - if (!queue.offer(eventObject)) { - LOG.warn("Message queue is full. Ignored the message:" + System.lineSeparator() + eventObject.toString()); - } else if (start.compareAndSet(false, true)) { - execute(); - } - } - - @Override - public void run() { - - try { - for (;;) { - append(queue.take()); - } - } catch (InterruptedException e) { - // ignore the error and rerun. - run(); - } catch (Exception e) { - close(); - } - } - - abstract protected void append(E rawData); - - protected void close() { - synchronized (THREAD_POOL) { - if (!THREAD_POOL.isShutdown()) { - shutdownAndAwaitTermination(THREAD_POOL); - } - } - } - - private static void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); - try { - if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { - pool.shutdownNow(); - if (!pool.awaitTermination(60, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - pool.shutdownNow(); - Thread.currentThread().interrupt(); - } - } -} diff --git a/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java b/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java index 860e199..e7dd802 100644 --- a/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java +++ b/src/main/java/ch/qos/logback/more/appenders/DynamoDBLogbackAppender.java @@ -13,59 +13,40 @@ */ package ch.qos.logback.more.appenders; -import static ch.qos.logback.core.CoreConstants.CODES_URL; import java.util.HashMap; import java.util.List; import java.util.Map; import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.PropertiesCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.QueryRequest; import com.amazonaws.services.dynamodbv2.model.QueryResult; -import ch.qos.logback.core.Layout; -import ch.qos.logback.core.UnsynchronizedAppenderBase; import ch.qos.logback.core.encoder.EchoEncoder; import ch.qos.logback.core.encoder.Encoder; -import ch.qos.logback.core.encoder.LayoutWrappingEncoder; -public class DynamoDBLogbackAppender extends UnsynchronizedAppenderBase { +public class DynamoDBLogbackAppender extends AwsAppender { private Encoder encoder = new EchoEncoder(); - @Deprecated - public void setLayout(Layout layout) { - addWarn("This appender no longer admits a layout as a sub-component, set an encoder instead."); - addWarn("To ensure compatibility, wrapping your layout in LayoutWrappingEncoder."); - addWarn("See also " + CODES_URL + "#layoutInsteadOfEncoder for details"); - LayoutWrappingEncoder lwe = new LayoutWrappingEncoder(); - lwe.setLayout(layout); - lwe.setContext(context); - this.encoder = lwe; - } - public void setEncoder(Encoder encoder) { this.encoder = encoder; } + public void setAwsConfig(AwsConfig config) { + this.config = config; + } + @Override public void start() { try { super.start(); - PropertiesCredentials credentials = new PropertiesCredentials( - getClass().getClassLoader().getResourceAsStream(dynamodbCredentialFilePath)); this.dynamoClient = AmazonDynamoDBClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials)) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - dynamodbEndpoint, dynamodbRegion)) - .build(); - this.id = getLastId(outputTableName, instanceName, dynamoClient); + .withRegion(config.getRegion()).build(); } catch (Exception e) { - addWarn("Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName() - + " ( will try to initialize again later ): " + e); + addError("Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName(), e); } } @@ -87,8 +68,8 @@ protected void append(E event) { if (this.id == -1) { this.id = getLastId(outputTableName, instanceName, dynamoClient); if (this.id == -1) { - addWarn("Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName() - + " ( will try to initialize again later ): "); + addError( + "Could not initialize " + DynamoDBLogbackAppender.class.getCanonicalName()); return; } } @@ -97,7 +78,8 @@ protected void append(E event) { data.put("id", new AttributeValue().withN(String.valueOf(++id))); data.put("msg", new AttributeValue().withS(new String(encoder.encode(event)))); - PutItemRequest itemRequest = new PutItemRequest().withTableName(outputTableName).withItem(data); + PutItemRequest itemRequest = + new PutItemRequest().withTableName(outputTableName).withItem(data); dynamoClient.putItem(itemRequest); } @@ -117,26 +99,11 @@ private static long getLastId(String tableName, String instanceName, } } - private String dynamodbCredentialFilePath; - private String dynamodbEndpoint; - private String dynamodbRegion; private String outputTableName; private String instanceName; private long id = -1; private AmazonDynamoDB dynamoClient; - public void setDynamodbCredentialFilePath(String dynamodbCredentialFilePath) { - this.dynamodbCredentialFilePath = dynamodbCredentialFilePath; - } - - public void setDynamodbEndpoint(String dynamodbEndpoint) { - this.dynamodbEndpoint = dynamodbEndpoint; - } - - public void setDynamodbRegion(String dynamodbRegion) { - this.dynamodbRegion = dynamodbRegion; - } - public void setOutputTableName(String outputTableName) { this.outputTableName = outputTableName; } diff --git a/src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java b/src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java new file mode 100644 index 0000000..8391c7f --- /dev/null +++ b/src/main/java/ch/qos/logback/more/appenders/IntervalEmitter.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2012 sndyuk + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package ch.qos.logback.more.appenders; + +import java.util.ArrayList; +import java.util.List; + +public class IntervalEmitter { + private long lastEmit = -1; + private long maxInterval; + private final List events; + private final EventMapper eventMapper; + private final IntervalAppender appender; + + IntervalEmitter(long maxInterval, EventMapper eventMapper, IntervalAppender appender) { + this.events = new ArrayList(); + this.maxInterval = maxInterval; + this.eventMapper = eventMapper; + this.appender = appender; + } + + void append(E event) { + events.add(eventMapper.map(event)); + + long now = System.currentTimeMillis(); + if (now > lastEmit + maxInterval) { + emit(); + lastEmit = now; + } + } + + public void emit() { + if (appender.append(events)) { + events.clear(); + } + } + + public interface EventMapper { + R map(E event); + } + + public interface IntervalAppender { + boolean append(List events); + } +} diff --git a/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java b/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java index 7ba3b53..92d3578 100644 --- a/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java +++ b/src/test/java/ch/qos/logback/more/appenders/LogbackAppenderTest.java @@ -18,18 +18,35 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; +import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.more.appenders.marker.MapMarker; public class LogbackAppenderTest { private static final Logger LOG = LoggerFactory.getLogger(LogbackAppenderTest.class); + @Before + public void before() { + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + if (!lc.isStarted()) { + lc.start(); + } + } + + @After + public void after() { + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + lc.stop(); + } + @Test public void logSimple() throws InterruptedException { @@ -100,7 +117,10 @@ public void logNestedMapMarker() throws InterruptedException { MapMarker mapMarker = new MapMarker("MAP_MARKER", map); notifyMarker.add(mapMarker); - LOG.debug(notifyMarker, "Test the nested marker map."); + for (int i = 0; i < 100; i++) { + LOG.debug(notifyMarker, "Test the nested marker map." + i); + } + Thread.sleep(1000); // Wait a moment because these log is being appended asynchronous... } diff --git a/src/test/resources/logback-appenders.xml b/src/test/resources/logback-appenders.xml index be888c1..aba3160 100644 --- a/src/test/resources/logback-appenders.xml +++ b/src/test/resources/logback-appenders.xml @@ -2,14 +2,56 @@ + + + + + AwsCredentials.properties + ap-northeast-1 + + test-log + + + + 100000 + + + + true + + 100 + + + + + + + + + 999 + + true + + 100 + + + + - - AwsCredentials.properties - - dynamodb.ap-northeast-1.amazonaws.com - ap-northeast-1 + + + AwsCredentials.properties + ap-northeast-1 + + AppLog - 10000 + 1000 + @@ -74,6 +117,7 @@ + debug @@ -138,6 +182,7 @@ + diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 4fbc5bc..7a3b4b7 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -5,6 +5,7 @@ +