From e7f8d3619960dac1cf5262099729f202cd22953b Mon Sep 17 00:00:00 2001 From: Clark Date: Fri, 14 Apr 2023 09:14:35 -0400 Subject: [PATCH] Fix multidevice contact sync update job reporting wrong content length. --- .../securesms/audio/AudioRecorder.java | 30 ++++---- .../securesms/audio/MediaRecorderWrapper.java | 9 +++ .../jobs/MultiDeviceContactUpdateJob.java | 53 +++++++++---- .../jobs/MultiDeviceGroupUpdateJob.java | 19 +++-- .../logsubmit/SubmitDebugLogRepository.java | 16 ++-- .../securesms/providers/BlobProvider.java | 76 ++++++------------- 6 files changed, 106 insertions(+), 97 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java b/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java index 568828379f..f466795b87 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java +++ b/app/src/main/java/org/thoughtcrime/securesms/audio/AudioRecorder.java @@ -17,7 +17,9 @@ import org.thoughtcrime.securesms.providers.BlobProvider; import org.thoughtcrime.securesms.util.MediaUtil; import java.io.IOException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.subjects.SingleSubject; @@ -32,8 +34,8 @@ public class AudioRecorder { private final AudioRecordingHandler uiHandler; private final AudioRecorderFocusManager audioFocusManager; - private Recorder recorder; - private Uri captureUri; + private Recorder recorder; + private Future recordingUriFuture; private SingleSubject recordingSubject; @@ -75,11 +77,12 @@ public class AudioRecorder { ParcelFileDescriptor fds[] = ParcelFileDescriptor.createPipe(); - captureUri = BlobProvider.getInstance() - .forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0) - .withMimeType(MediaUtil.AUDIO_AAC) - .createForDraftAttachmentAsync(context, () -> Log.i(TAG, "Write successful."), e -> Log.w(TAG, "Error during recording", e)); - recorder = Build.VERSION.SDK_INT >= 26 ? new MediaRecorderWrapper() : new AudioCodec(); + recordingUriFuture = BlobProvider.getInstance() + .forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0) + .withMimeType(MediaUtil.AUDIO_AAC) + .createForDraftAttachmentAsync(context); + + recorder = Build.VERSION.SDK_INT >= 26 ? new MediaRecorderWrapper() : new AudioCodec(); int focusResult = audioFocusManager.requestAudioFocus(); if (focusResult != AudioManager.AUDIOFOCUS_REQUEST_GRANTED) { Log.w(TAG, "Could not gain audio focus. Received result code " + focusResult); @@ -109,16 +112,17 @@ public class AudioRecorder { recorder.stop(); try { - long size = MediaUtil.getMediaSize(context, captureUri); - recordingSubject.onSuccess(new VoiceNoteDraft(captureUri, size)); - } catch (IOException ioe) { + Uri uri = recordingUriFuture.get(); + long size = MediaUtil.getMediaSize(context, uri); + recordingSubject.onSuccess(new VoiceNoteDraft(uri, size)); + } catch (IOException | ExecutionException | InterruptedException ioe) { Log.w(TAG, ioe); recordingSubject.onError(ioe); } - recordingSubject = null; - recorder = null; - captureUri = null; + recordingSubject = null; + recorder = null; + recordingUriFuture = null; }); } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/audio/MediaRecorderWrapper.java b/app/src/main/java/org/thoughtcrime/securesms/audio/MediaRecorderWrapper.java index 76f2e20167..c35f545422 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/audio/MediaRecorderWrapper.java +++ b/app/src/main/java/org/thoughtcrime/securesms/audio/MediaRecorderWrapper.java @@ -4,6 +4,7 @@ import android.media.MediaRecorder; import android.os.Build; import android.os.ParcelFileDescriptor; +import org.signal.core.util.StreamUtil; import org.signal.core.util.logging.Log; import java.io.IOException; @@ -20,10 +21,14 @@ public class MediaRecorderWrapper implements Recorder { private static final int BIT_RATE = 32000; private MediaRecorder recorder = null; + + private ParcelFileDescriptor outputFileDescriptor; @Override public void start(ParcelFileDescriptor fileDescriptor) throws IOException { Log.i(TAG, "Recording voice note using MediaRecorderWrapper."); + this.outputFileDescriptor = fileDescriptor; + recorder = new MediaRecorder(); try { @@ -40,6 +45,8 @@ public class MediaRecorderWrapper implements Recorder { Log.w(TAG, "Unable to start recording", e); recorder.release(); recorder = null; + StreamUtil.close(outputFileDescriptor); + outputFileDescriptor = null; throw new IOException(e); } } @@ -61,6 +68,8 @@ public class MediaRecorderWrapper implements Recorder { } finally { recorder.release(); recorder = null; + StreamUtil.close(outputFileDescriptor); + outputFileDescriptor = null; } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceContactUpdateJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceContactUpdateJob.java index 95d155fdf5..90ada93c0c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceContactUpdateJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceContactUpdateJob.java @@ -53,6 +53,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class MultiDeviceContactUpdateJob extends BaseJob { @@ -139,6 +141,7 @@ public class MultiDeviceContactUpdateJob extends BaseJob { { WriteDetails writeDetails = createTempFile(); + Uri updateUri = null; try { DeviceContactsOutputStream out = new DeviceContactsOutputStream(writeDetails.outputStream); Recipient recipient = Recipient.resolved(recipientId); @@ -166,18 +169,21 @@ public class MultiDeviceContactUpdateJob extends BaseJob { archived.contains(recipientId))); out.close(); + updateUri = writeDetails.getUri(); - long length = BlobProvider.getInstance().calculateFileSize(context, writeDetails.uri); + long length = BlobProvider.getInstance().calculateFileSize(context, updateUri); sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(), - BlobProvider.getInstance().getStream(context, writeDetails.uri), + BlobProvider.getInstance().getStream(context, updateUri), length, false); - } catch(InvalidNumberException e) { + } catch(InvalidNumberException | InterruptedException e) { Log.w(TAG, e); } finally { - BlobProvider.getInstance().delete(context, writeDetails.uri); + if (updateUri != null) { + BlobProvider.getInstance().delete(context, updateUri); + } } } @@ -200,6 +206,7 @@ public class MultiDeviceContactUpdateJob extends BaseJob { WriteDetails writeDetails = createTempFile(); + Uri updateUri = null; try { DeviceContactsOutputStream out = new DeviceContactsOutputStream(writeDetails.outputStream); List recipients = SignalDatabase.recipients().getRecipientsForMultiDeviceSync(); @@ -246,16 +253,20 @@ public class MultiDeviceContactUpdateJob extends BaseJob { out.close(); - long length = BlobProvider.getInstance().calculateFileSize(context, writeDetails.uri); + updateUri = writeDetails.getUri(); + + long length = BlobProvider.getInstance().calculateFileSize(context, updateUri); sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(), - BlobProvider.getInstance().getStream(context, writeDetails.uri), + BlobProvider.getInstance().getStream(context, updateUri), length, true); - } catch(InvalidNumberException e) { + } catch(InvalidNumberException | InterruptedException e) { Log.w(TAG, e); } finally { - BlobProvider.getInstance().delete(context, writeDetails.uri); + if (updateUri != null) { + BlobProvider.getInstance().delete(context, updateUri); + } } } @@ -375,14 +386,12 @@ public class MultiDeviceContactUpdateJob extends BaseJob { private @NonNull WriteDetails createTempFile() throws IOException { ParcelFileDescriptor[] pipe = ParcelFileDescriptor.createPipe(); InputStream inputStream = new ParcelFileDescriptor.AutoCloseInputStream(pipe[0]); - Uri uri = BlobProvider.getInstance() + Future futureUri = BlobProvider.getInstance() .forData(inputStream, 0) .withFileName("multidevice-contact-update") - .createForSingleSessionOnDiskAsync(context, - () -> Log.i(TAG, "Write successful."), - e -> Log.w(TAG, "Error during write.", e)); + .createForSingleSessionOnDiskAsync(context); - return new WriteDetails(uri, new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1])); + return new WriteDetails(futureUri, new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1])); } private static class NetworkException extends Exception { @@ -393,13 +402,25 @@ public class MultiDeviceContactUpdateJob extends BaseJob { } private static class WriteDetails { - private final Uri uri; + private final Future futureUri; private final OutputStream outputStream; - private WriteDetails(@NonNull Uri uri, @NonNull OutputStream outputStream) { - this.uri = uri; + private WriteDetails(@NonNull Future blobUri, @NonNull OutputStream outputStream) { + this.futureUri = blobUri; this.outputStream = outputStream; } + + public Uri getUri() throws IOException, InterruptedException { + try { + return futureUri.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + } } public static final class Factory implements Job.Factory { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceGroupUpdateJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceGroupUpdateJob.java index bd42f6d567..d4560ae7c7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceGroupUpdateJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/MultiDeviceGroupUpdateJob.java @@ -13,7 +13,6 @@ import org.thoughtcrime.securesms.database.GroupTable; import org.thoughtcrime.securesms.database.SignalDatabase; import org.thoughtcrime.securesms.database.model.GroupRecord; import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; -import org.thoughtcrime.securesms.jobmanager.JsonJobData; import org.thoughtcrime.securesms.jobmanager.Job; import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; import org.thoughtcrime.securesms.keyvalue.SignalStore; @@ -43,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class MultiDeviceGroupUpdateJob extends BaseJob { @@ -92,12 +92,12 @@ public class MultiDeviceGroupUpdateJob extends BaseJob { ParcelFileDescriptor[] pipe = ParcelFileDescriptor.createPipe(); InputStream inputStream = new ParcelFileDescriptor.AutoCloseInputStream(pipe[0]); - Uri uri = BlobProvider.getInstance() + Future futureUri = BlobProvider.getInstance() .forData(inputStream, 0) .withFileName("multidevice-group-update") - .createForSingleSessionOnDiskAsync(context, - () -> Log.i(TAG, "Write successful."), - e -> Log.w(TAG, "Error during write.", e)); + .createForSingleSessionOnDiskAsync(context); + + Uri blobUri = null; try (GroupTable.Reader reader = SignalDatabase.groups().getGroups()) { DeviceGroupsOutputStream out = new DeviceGroupsOutputStream(new ParcelFileDescriptor.AutoCloseOutputStream(pipe[1])); @@ -138,10 +138,11 @@ public class MultiDeviceGroupUpdateJob extends BaseJob { out.close(); if (hasData) { - long length = BlobProvider.getInstance().calculateFileSize(context, uri); + blobUri = futureUri.get(); + long length = BlobProvider.getInstance().calculateFileSize(context, blobUri); sendUpdate(ApplicationDependencies.getSignalServiceMessageSender(), - BlobProvider.getInstance().getStream(context, uri), + BlobProvider.getInstance().getStream(context, blobUri), length); } else { Log.w(TAG, "No groups present for sync message. Sending an empty update."); @@ -151,7 +152,9 @@ public class MultiDeviceGroupUpdateJob extends BaseJob { 0); } } finally { - BlobProvider.getInstance().delete(context, uri); + if (blobUri != null) { + BlobProvider.getInstance().delete(context, blobUri); + } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/logsubmit/SubmitDebugLogRepository.java b/app/src/main/java/org/thoughtcrime/securesms/logsubmit/SubmitDebugLogRepository.java index 921c7abe5b..66348ceafe 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/logsubmit/SubmitDebugLogRepository.java +++ b/app/src/main/java/org/thoughtcrime/securesms/logsubmit/SubmitDebugLogRepository.java @@ -29,13 +29,14 @@ import org.signal.core.util.Stopwatch; import java.io.IOException; import java.io.OutputStream; -import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.regex.Pattern; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; @@ -179,11 +180,11 @@ public class SubmitDebugLogRepository { try { Stopwatch stopwatch = new Stopwatch("log-upload"); - ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); - Uri gzipUri = BlobProvider.getInstance() - .forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0) - .withMimeType("application/gzip") - .createForSingleSessionOnDiskAsync(context, null, null); + ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); + Future futureUri = BlobProvider.getInstance() + .forData(new ParcelFileDescriptor.AutoCloseInputStream(fds[0]), 0) + .withMimeType("application/gzip") + .createForSingleSessionOnDiskAsync(context); OutputStream gzipOutput = new GZIPOutputStream(new ParcelFileDescriptor.AutoCloseOutputStream(fds[1])); @@ -202,6 +203,7 @@ public class SubmitDebugLogRepository { } StreamUtil.close(gzipOutput); + Uri gzipUri = futureUri.get(); stopwatch.split("body"); @@ -228,7 +230,7 @@ public class SubmitDebugLogRepository { BlobProvider.getInstance().delete(context, gzipUri); return Optional.of(logUrl); - } catch (IOException e) { + } catch (IOException | RuntimeException | ExecutionException | InterruptedException e) { Log.w(TAG, "Error during log upload.", e); return Optional.empty(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/providers/BlobProvider.java b/app/src/main/java/org/thoughtcrime/securesms/providers/BlobProvider.java index 0bc38b10a7..c78b8a702c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/providers/BlobProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/providers/BlobProvider.java @@ -40,8 +40,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; /** @@ -329,32 +329,22 @@ public class BlobProvider { { waitUntilInitialized(); - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exception = new AtomicReference<>(null); - Uri uri = writeBlobSpecToDiskAsync(context, blobSpec, latch::countDown, e -> { - exception.set(e); - latch.countDown(); - }); - + Future uriFuture = writeBlobSpecToDiskAsync(context, blobSpec); try { - latch.await(); - } catch (InterruptedException e) { - throw new IOException(e); + return uriFuture.get(); + } catch (ExecutionException | InterruptedException e) { + Log.e(TAG, "Error writing blob spec to disk", e); + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e); + } } - - if (exception.get() != null) { - throw exception.get(); - } - - return uri; } @WorkerThread - private synchronized @NonNull Uri writeBlobSpecToDiskAsync(@NonNull Context context, - @NonNull BlobSpec blobSpec, - @Nullable SuccessListener successListener, - @Nullable ErrorListener errorListener) + private synchronized @NonNull Future writeBlobSpecToDiskAsync(@NonNull Context context, @NonNull BlobSpec blobSpec) throws IOException { AttachmentSecret attachmentSecret = AttachmentSecretProvider.getInstance(context).getOrCreateAttachmentSecret(); @@ -362,22 +352,18 @@ public class BlobProvider { File outputFile = new File(getOrCreateDirectory(context, directory), buildFileName(blobSpec.id)); OutputStream outputStream = ModernEncryptingPartOutputStream.createFor(attachmentSecret, outputFile, true).second; - SignalExecutors.UNBOUNDED.execute(() -> { + final Uri uri = buildUri(blobSpec); + + return SignalExecutors.BOUNDED.submit(() -> { try { StreamUtil.copy(blobSpec.getData(), outputStream); - - if (successListener != null) { - successListener.onSuccess(); - } + return uri; } catch (IOException e) { + delete(context, uri); Log.w(TAG, "Error during write!", e); - if (errorListener != null) { - errorListener.onError(e); - } + throw e; } }); - - return buildUri(blobSpec); } private synchronized @NonNull Uri writeBlobSpecToMemory(@NonNull BlobSpec blobSpec, @NonNull byte[] data) { @@ -470,18 +456,14 @@ public class BlobProvider { } /** - * Create a blob that will exist for a single app session. An app session is defined as the + * Create an async blob that will exist for a single app session. An app session is defined as the * period from one {@link Application#onCreate()} to the next. The file will be created on disk * synchronously, but the data will copied asynchronously. This is helpful when the copy is * long-running, such as in the case of recording a voice note. */ @WorkerThread - public Uri createForSingleSessionOnDiskAsync(@NonNull Context context, - @Nullable SuccessListener successListener, - @Nullable ErrorListener errorListener) - throws IOException - { - return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.SINGLE_SESSION_DISK), successListener, errorListener); + public Future createForSingleSessionOnDiskAsync(@NonNull Context context) throws IOException { + return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.SINGLE_SESSION_DISK)); } /** @@ -502,12 +484,10 @@ public class BlobProvider { * when the blob is no longer in use. */ @WorkerThread - public Uri createForDraftAttachmentAsync(@NonNull Context context, - @Nullable SuccessListener successListener, - @Nullable ErrorListener errorListener) + public Future createForDraftAttachmentAsync(@NonNull Context context) throws IOException { - return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.ATTACHMENT_DRAFT), successListener, errorListener); + return writeBlobSpecToDiskAsync(context, buildBlobSpec(StorageType.ATTACHMENT_DRAFT)); } } @@ -562,16 +542,6 @@ public class BlobProvider { } } - public interface SuccessListener { - @WorkerThread - void onSuccess(); - } - - public interface ErrorListener { - @WorkerThread - void onError(IOException e); - } - private static class BlobSpec { private final InputStream data;