diff --git a/.changes/next-release/feature-AWSSDKforJavav2-c5f66b2.json b/.changes/next-release/feature-AWSSDKforJavav2-c5f66b2.json new file mode 100644 index 000000000000..d8f0781e82e2 --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-c5f66b2.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "scrocquesel", + "description": "Add client configuration overriding of SCHEDULED_EXECUTOR_SERVICE option" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java index c91ad39ad1a3..18fcc1e52f2e 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java @@ -91,6 +91,7 @@ import software.amazon.awssdk.profiles.ProfileFileSystemSetting; import software.amazon.awssdk.utils.AttributeMap; import software.amazon.awssdk.utils.Either; +import software.amazon.awssdk.utils.ScheduledExecutorUtils; import software.amazon.awssdk.utils.ThreadFactoryBuilder; import software.amazon.awssdk.utils.Validate; @@ -222,6 +223,7 @@ private SdkClientConfiguration setOverrides(SdkClientConfiguration configuration SdkClientConfiguration.Builder builder = configuration.toBuilder(); + builder.option(SCHEDULED_EXECUTOR_SERVICE, clientOverrideConfiguration.scheduledExecutorService().orElse(null)); builder.option(EXECUTION_INTERCEPTORS, clientOverrideConfiguration.executionInterceptors()); builder.option(RETRY_POLICY, clientOverrideConfiguration.retryPolicy().orElse(null)); builder.option(ADDITIONAL_HTTP_HEADERS, clientOverrideConfiguration.headers()); @@ -313,7 +315,7 @@ private SdkClientConfiguration finalizeAsyncConfiguration(SdkClientConfiguration private SdkClientConfiguration finalizeConfiguration(SdkClientConfiguration config) { RetryPolicy retryPolicy = resolveRetryPolicy(config); return config.toBuilder() - .option(SCHEDULED_EXECUTOR_SERVICE, resolveScheduledExecutorService()) + .option(SCHEDULED_EXECUTOR_SERVICE, resolveScheduledExecutorService(config)) .option(EXECUTION_INTERCEPTORS, resolveExecutionInterceptors(config)) .option(RETRY_POLICY, retryPolicy) .option(CLIENT_USER_AGENT, resolveClientUserAgent(config, retryPolicy)) @@ -410,9 +412,17 @@ private Executor resolveAsyncFutureCompletionExecutor(SdkClientConfiguration con * Finalize the internal SDK scheduled executor service that is used for scheduling tasks such * as async retry attempts and timeout task. */ - private ScheduledExecutorService resolveScheduledExecutorService() { - return Executors.newScheduledThreadPool(5, new ThreadFactoryBuilder() - .threadNamePrefix("sdk-ScheduledExecutor").build()); + private ScheduledExecutorService resolveScheduledExecutorService(SdkClientConfiguration config) { + Supplier defaultScheduledExecutor = () -> { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(5, new ThreadFactoryBuilder() + .threadNamePrefix("sdk-ScheduledExecutor").build()); + + return executor; + }; + + return Optional.ofNullable(config.option(SCHEDULED_EXECUTOR_SERVICE)) + .map(ScheduledExecutorUtils::unmanagedScheduledExecutor) + .orElseGet(defaultScheduledExecutor); } /** diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/ClientOverrideConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/ClientOverrideConfiguration.java index 4ba034413b90..83cf2317038d 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/ClientOverrideConfiguration.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/config/ClientOverrideConfiguration.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.annotations.ToBuilderIgnoreField; @@ -62,6 +63,7 @@ public final class ClientOverrideConfiguration private final String defaultProfileName; private final List metricPublishers; private final ExecutionAttributes executionAttributes; + private final ScheduledExecutorService scheduledExecutorService; /** * Initialize this configuration. Private to require use of {@link #builder()}. @@ -77,6 +79,7 @@ private ClientOverrideConfiguration(Builder builder) { this.defaultProfileName = builder.defaultProfileName(); this.metricPublishers = Collections.unmodifiableList(new ArrayList<>(builder.metricPublishers())); this.executionAttributes = ExecutionAttributes.unmodifiableExecutionAttributes(builder.executionAttributes()); + this.scheduledExecutorService = builder.scheduledExecutorService(); } @Override @@ -92,7 +95,8 @@ public Builder toBuilder() { .defaultProfileFile(defaultProfileFile) .defaultProfileName(defaultProfileName) .executionAttributes(executionAttributes) - .metricPublishers(metricPublishers); + .metricPublishers(metricPublishers) + .scheduledExecutorService(scheduledExecutorService); } /** @@ -141,6 +145,17 @@ public List executionInterceptors() { return executionInterceptors; } + /** + * The optional scheduled executor service that should be used for scheduling tasks such as async retry attempts + * and timeout task. + *

+ * The SDK will not automatically close the executor when the client is closed. It is the responsibility of the + * user to manually close the executor once all clients utilizing it have been closed. + */ + public Optional scheduledExecutorService() { + return Optional.ofNullable(scheduledExecutorService); + } + /** * The amount of time to allow the client to complete the execution of an API call. This timeout covers the entire client * execution except for marshalling. This includes request handler execution, all HTTP requests including retries, @@ -226,6 +241,7 @@ public String toString() { .add("advancedOptions", advancedOptions) .add("profileFile", defaultProfileFile) .add("profileName", defaultProfileName) + .add("scheduledExecutorService", scheduledExecutorService) .build(); } @@ -338,6 +354,20 @@ default Builder retryPolicy(RetryMode retryMode) { List executionInterceptors(); + /** + * Configure the scheduled executor service that should be used for scheduling tasks such as async retry attempts + * and timeout task. + * + *

+ * The SDK will not automatically close the executor when the client is closed. It is the responsibility of the + * user to manually close the executor once all clients utilizing it have been closed. + * + * @see ClientOverrideConfiguration#scheduledExecutorService() + */ + Builder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService); + + ScheduledExecutorService scheduledExecutorService(); + /** * Configure an advanced override option. These values are used very rarely, and the majority of SDK customers can ignore * them. @@ -499,6 +529,7 @@ private static final class DefaultClientOverrideConfigurationBuilder implements private String defaultProfileName; private List metricPublishers = new ArrayList<>(); private ExecutionAttributes.Builder executionAttributes = ExecutionAttributes.builder(); + private ScheduledExecutorService scheduledExecutorService; @Override public Builder headers(Map> headers) { @@ -561,6 +592,18 @@ public List executionInterceptors() { return Collections.unmodifiableList(executionInterceptors); } + @Override + public ScheduledExecutorService scheduledExecutorService() + { + return scheduledExecutorService; + } + + @Override + public Builder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; + return this; + } + @Override public Builder putAdvancedOption(SdkAdvancedClientOption option, T value) { this.advancedOptions.put(option, value); diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/client/builder/DefaultClientBuilderTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/client/builder/DefaultClientBuilderTest.java index ec526330cdc9..bc4a00954ca8 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/client/builder/DefaultClientBuilderTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/client/builder/DefaultClientBuilderTest.java @@ -38,6 +38,7 @@ import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_FILE_SUPPLIER; import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_NAME; import static software.amazon.awssdk.core.client.config.SdkClientOption.RETRY_POLICY; +import static software.amazon.awssdk.core.client.config.SdkClientOption.SCHEDULED_EXECUTOR_SERVICE; import static software.amazon.awssdk.core.internal.SdkInternalTestAdvancedClientOption.ENDPOINT_OVERRIDDEN_OVERRIDE; import java.beans.BeanInfo; @@ -52,6 +53,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -76,6 +79,7 @@ import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.profiles.ProfileFile; import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.awssdk.utils.ScheduledExecutorUtils.UnmanagedScheduledExecutorService; import software.amazon.awssdk.utils.StringInputStream; /** @@ -132,6 +136,7 @@ public void overrideConfigurationReturnsSetValues() { .type(ProfileFile.Type.CONFIGURATION) .build(); String profileName = "name"; + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .executionInterceptors(interceptors) @@ -148,6 +153,7 @@ public void overrideConfigurationReturnsSetValues() { .metricPublishers(metricPublishers) .executionAttributes(executionAttributes) .putAdvancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE, Boolean.TRUE) + .scheduledExecutorService(scheduledExecutorService) .build(); TestClientBuilder builder = testClientBuilder().overrideConfiguration(overrideConfig); @@ -166,6 +172,7 @@ public void overrideConfigurationReturnsSetValues() { assertThat(builderOverrideConfig.metricPublishers()).isEqualTo(metricPublishers); assertThat(builderOverrideConfig.executionAttributes().getAttributes()).isEqualTo(executionAttributes.getAttributes()); assertThat(builderOverrideConfig.advancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE)).isEqualTo(Optional.of(Boolean.TRUE)); + assertThat(builderOverrideConfig.scheduledExecutorService().get()).isEqualTo(scheduledExecutorService); } @Test @@ -189,6 +196,7 @@ public void overrideConfigurationOmitsUnsetValues() { assertThat(builderOverrideConfig.metricPublishers()).isEmpty(); assertThat(builderOverrideConfig.executionAttributes().getAttributes()).isEmpty(); assertThat(builderOverrideConfig.advancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE)).isEmpty(); + assertThat(builderOverrideConfig.scheduledExecutorService()).isEmpty(); } @Test @@ -198,6 +206,7 @@ public void buildIncludesClientOverrides() { interceptors.add(interceptor); RetryPolicy retryPolicy = RetryPolicy.builder().build(); + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); Map> headers = new HashMap<>(); List headerValues = new ArrayList<>(); @@ -247,6 +256,7 @@ public void close() { .metricPublishers(metricPublishers) .executionAttributes(executionAttributes) .putAdvancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE, Boolean.TRUE) + .scheduledExecutorService(scheduledExecutorService) .build(); SdkClientConfiguration config = @@ -267,6 +277,9 @@ public void close() { assertThat(config.option(METRIC_PUBLISHERS)).contains(metricPublisher); assertThat(config.option(EXECUTION_ATTRIBUTES).getAttribute(execAttribute)).isEqualTo("value"); assertThat(config.option(ENDPOINT_OVERRIDDEN)).isEqualTo(Boolean.TRUE); + UnmanagedScheduledExecutorService customScheduledExecutorService = + (UnmanagedScheduledExecutorService) config.option(SCHEDULED_EXECUTOR_SERVICE); + assertThat(customScheduledExecutorService.scheduledExecutorService()).isEqualTo(scheduledExecutorService); } @Test diff --git a/test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/ResourceManagementTest.java b/test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/ResourceManagementTest.java index 2b047680921d..97dceccfb110 100644 --- a/test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/ResourceManagementTest.java +++ b/test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/ResourceManagementTest.java @@ -23,6 +23,8 @@ import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.http.SdkHttpClient; @@ -83,6 +85,16 @@ public void executorFromBuilderNotShutdown() { verify(executor, never()).shutdownNow(); } + @Test + public void scheduledExecutorFromBuilderNotShutdown() { + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + + asyncClientBuilder().overrideConfiguration(c -> c.scheduledExecutorService(scheduledExecutorService)).build().close(); + + verify(scheduledExecutorService, never()).shutdown(); + verify(scheduledExecutorService, never()).shutdownNow(); + } + public ProtocolRestJsonClientBuilder syncClientBuilder() { return ProtocolRestJsonClient.builder() .region(Region.US_EAST_1) diff --git a/utils/src/main/java/software/amazon/awssdk/utils/ScheduledExecutorUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/ScheduledExecutorUtils.java new file mode 100644 index 000000000000..d6e33f7a607f --- /dev/null +++ b/utils/src/main/java/software/amazon/awssdk/utils/ScheduledExecutorUtils.java @@ -0,0 +1,157 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.utils; + +import static software.amazon.awssdk.utils.Validate.paramNotNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.annotations.SdkTestInternalApi; + +/** + * Utilities that make it easier to create, use and destroy + * {@link ScheduledExecutor}s. + */ +@SdkProtectedApi +public final class ScheduledExecutorUtils { + private ScheduledExecutorUtils() { + } + + /** + * Wrap a scheduled executor in a type that cannot be closed, or shut down. + */ + public static ScheduledExecutorService unmanagedScheduledExecutor(ScheduledExecutorService executor) { + return new UnmanagedScheduledExecutorService(executor); + } + + /** + * Wrapper around {@link ScheduledExecutorService} to prevent it from being + * closed. Used when the customer provides + * a custom scheduled executor service in which case they are responsible for + * the lifecycle of it. + */ + @SdkTestInternalApi + public static final class UnmanagedScheduledExecutorService implements ScheduledExecutorService { + + private final ScheduledExecutorService delegate; + + UnmanagedScheduledExecutorService(ScheduledExecutorService delegate) { + this.delegate = paramNotNull(delegate, "ScheduledExecutorService"); + } + + public ScheduledExecutorService scheduledExecutorService() { + return delegate; + } + + @Override + public void shutdown() { + // Do nothing, this executor service is managed by the customer. + } + + @Override + public List shutdownNow() { + return new ArrayList<>(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, + TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + } +}