Add tests for WhisperServerService#run

Additionally, `LocalWhisperServerService` may be used for integration testing.
This commit is contained in:
Chris Eager
2024-04-29 11:05:35 -05:00
committed by GitHub
parent b6f8bca361
commit 0e4be0c85a
84 changed files with 2156 additions and 552 deletions

View File

@@ -1,22 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.s3;
import io.dropwizard.lifecycle.Managed;
import java.util.function.Supplier;
public interface ManagedSupplier<T> extends Supplier<T>, Managed {
@Override
default void start() throws Exception {
// noop
}
@Override
default void stop() throws Exception {
// noop
}
}

View File

@@ -1,93 +0,0 @@
/*
* Copyright 2013-2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.s3;
import static com.codahale.metrics.MetricRegistry.name;
import static java.util.Objects.requireNonNull;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.MonitoredS3ObjectConfiguration;
public class S3MonitoringSupplier<T> implements ManagedSupplier<T> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Nonnull
private final Timer refreshTimer;
@Nonnull
private final Counter refreshErrors;
@Nonnull
private final AtomicReference<T> holder;
@Nonnull
private final S3ObjectMonitor monitor;
@Nonnull
private final Function<InputStream, T> parser;
public S3MonitoringSupplier(
@Nonnull final ScheduledExecutorService executor,
@Nonnull final MonitoredS3ObjectConfiguration cfg,
@Nonnull final Function<InputStream, T> parser,
@Nonnull final T initial,
@Nonnull final String name) {
this.refreshTimer = Metrics.timer(name(name, "refresh"));
this.refreshErrors = Metrics.counter(name(name, "refreshErrors"));
this.holder = new AtomicReference<>(initial);
this.parser = requireNonNull(parser);
this.monitor = new S3ObjectMonitor(
cfg.s3Region(),
cfg.s3Bucket(),
cfg.objectKey(),
cfg.maxSize(),
executor,
cfg.refreshInterval(),
this::handleObjectChange
);
}
@Override
@Nonnull
public T get() {
return requireNonNull(holder.get());
}
@Override
public void start() throws Exception {
monitor.start();
}
@Override
public void stop() throws Exception {
monitor.stop();
}
private void handleObjectChange(@Nonnull final InputStream inputStream) {
refreshTimer.record(() -> {
// parser function is supposed to close the input stream
try {
holder.set(parser.apply(inputStream));
} catch (final Exception e) {
log.error("failed to update internal state from the monitored object", e);
refreshErrors.increment();
}
});
}
}

View File

@@ -17,7 +17,7 @@ import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerService;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
@@ -39,8 +39,6 @@ public class S3ObjectMonitor {
private final Duration refreshInterval;
private ScheduledFuture<?> refreshFuture;
private final Consumer<InputStream> changeListener;
private final AtomicReference<String> lastETag = new AtomicReference<>();
private final S3Client s3Client;
@@ -48,24 +46,23 @@ public class S3ObjectMonitor {
private static final Logger log = LoggerFactory.getLogger(S3ObjectMonitor.class);
public S3ObjectMonitor(
final AwsCredentialsProvider awsCredentialsProvider,
final String s3Region,
final String s3Bucket,
final String objectKey,
final long maxObjectSize,
final ScheduledExecutorService refreshExecutorService,
final Duration refreshInterval,
final Consumer<InputStream> changeListener) {
final Duration refreshInterval) {
this(S3Client.builder()
.region(Region.of(s3Region))
.credentialsProvider(WhisperServerService.AWSSDK_CREDENTIALS_PROVIDER)
.credentialsProvider(awsCredentialsProvider)
.build(),
s3Bucket,
objectKey,
maxObjectSize,
refreshExecutorService,
refreshInterval,
changeListener);
refreshInterval);
}
@VisibleForTesting
@@ -75,8 +72,7 @@ public class S3ObjectMonitor {
final String objectKey,
final long maxObjectSize,
final ScheduledExecutorService refreshExecutorService,
final Duration refreshInterval,
final Consumer<InputStream> changeListener) {
final Duration refreshInterval) {
this.s3Client = s3Client;
this.s3Bucket = s3Bucket;
@@ -85,21 +81,19 @@ public class S3ObjectMonitor {
this.refreshExecutorService = refreshExecutorService;
this.refreshInterval = refreshInterval;
this.changeListener = changeListener;
}
public synchronized void start() {
public synchronized void start(final Consumer<InputStream> changeListener) {
if (refreshFuture != null) {
throw new RuntimeException("S3 object manager already started");
}
// Run the first request immediately/blocking, then start subsequent calls.
log.info("Initial request for s3://{}/{}", s3Bucket, objectKey);
refresh();
refresh(changeListener);
refreshFuture = refreshExecutorService
.scheduleAtFixedRate(this::refresh, refreshInterval.toMillis(), refreshInterval.toMillis(),
.scheduleAtFixedRate(() -> refresh(changeListener), refreshInterval.toMillis(), refreshInterval.toMillis(),
TimeUnit.MILLISECONDS);
}
@@ -139,7 +133,7 @@ public class S3ObjectMonitor {
* changed since the last call to {@link #getObject()} or {@code refresh()}.
*/
@VisibleForTesting
void refresh() {
void refresh(final Consumer<InputStream> changeListener) {
try {
final HeadObjectResponse objectMetadata = s3Client.headObject(HeadObjectRequest.builder()
.bucket(s3Bucket)