Use our own homemade paging library for conversation paging.

I made the lib, and Alan made the build actually work.

Co-authored-by: Alan Evans <alan@signal.org>
This commit is contained in:
Greyson Parrelli
2020-12-03 13:19:03 -05:00
parent ac41f3d662
commit 31960b53a0
49 changed files with 1596 additions and 243 deletions

View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest
xmlns:android="http://schemas.android.com/apk/res/android"
package="org.signal.paging">
</manifest>

View File

@@ -0,0 +1,64 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import androidx.lifecycle.MutableLiveData;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* We have a bit of a threading problem -- we want our controller to have a fixed size so that it
* can keep track of which ranges of requests are in flight, but it needs to make a blocking call
* to find out the size of the dataset first!
*
* So what this controller does is use a serial executor so that it can buffer calls to a secondary
* controller. The first task on the executor creates the first controller, so all future calls to
* {@link #onDataNeededAroundIndex(int)} are guaranteed to have an active controller.
*
* It's also worth noting that this controller has lifecycle that matches the {@link PagedData} that
* contains it. When invalidations come in, this class will just swap out the active controller with
* a new one.
*/
class BufferedPagingController<E> implements PagingController {
private final PagedDataSource<E> dataSource;
private final PagingConfig config;
private final MutableLiveData<List<E>> liveData;
private final Executor serializationExecutor;
private PagingController activeController;
private int lastRequestedIndex;
BufferedPagingController(PagedDataSource<E> dataSource, PagingConfig config, @NonNull MutableLiveData<List<E>> liveData) {
this.dataSource = dataSource;
this.config = config;
this.liveData = liveData;
this.serializationExecutor = Executors.newSingleThreadExecutor();
this.activeController = null;
this.lastRequestedIndex = config.startIndex();
onDataInvalidated();
}
@Override
public void onDataNeededAroundIndex(int aroundIndex) {
serializationExecutor.execute(() -> {
lastRequestedIndex = aroundIndex;
activeController.onDataNeededAroundIndex(aroundIndex);
});
}
@Override
public void onDataInvalidated() {
serializationExecutor.execute(() -> {
if (activeController != null) {
activeController.onDataInvalidated();
}
activeController = new FixedSizePagingController<>(dataSource, config, liveData, dataSource.size());
activeController.onDataNeededAroundIndex(lastRequestedIndex);
});
}
}

View File

@@ -0,0 +1,43 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
/**
* A placeholder class for efficiently storing lists that are mostly empty space.
* TODO [greyson][paging]
*/
public class CompressedList<E> extends AbstractList<E> {
private final List<E> wrapped;
public CompressedList(@NonNull List<E> source) {
this.wrapped = new ArrayList<>(source);
}
public CompressedList(int totalSize) {
this.wrapped = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; i++) {
wrapped.add(null);
}
}
@Override
public int size() {
return wrapped.size();
}
@Override
public E get(int index) {
return wrapped.get(index);
}
@Override
public E set(int globalIndex, E element) {
return wrapped.set(globalIndex, element);
}
}

View File

@@ -0,0 +1,64 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import androidx.core.util.Pools;
import java.util.BitSet;
/**
* Keeps track of what data is empty vs filled with an emphasis on doing so in a space-efficient way.
*/
class DataStatus {
private static final Pools.Pool<BitSet> POOL = new Pools.SynchronizedPool<>(1);
private final BitSet state;
private final int size;
public static DataStatus obtain(int size) {
BitSet bitset = POOL.acquire();
if (bitset == null) {
bitset = new BitSet(size);
} else {
bitset.clear();
}
return new DataStatus(size, bitset);
}
private DataStatus(int size, @NonNull BitSet bitset) {
this.size = size;
this.state = bitset;
}
void markRange(int startInclusive, int endExclusive) {
state.set(startInclusive, endExclusive, true);
}
int getEarliestUnmarkedIndexInRange(int startInclusive, int endExclusive) {
for (int i = startInclusive; i < endExclusive; i++) {
if (!state.get(i)) {
return i;
}
}
return -1;
}
int getLatestUnmarkedIndexInRange(int startInclusive, int endExclusive) {
for (int i = endExclusive - 1; i >= startInclusive; i--) {
if (!state.get(i)) {
return i;
}
}
return -1;
}
int size() {
return size;
}
void recycle() {
POOL.release(state);
}
}

View File

@@ -0,0 +1,127 @@
package org.signal.paging;
import android.util.Log;
import androidx.annotation.NonNull;
import androidx.lifecycle.MutableLiveData;
import org.signal.paging.util.LinkedBlockingLifoQueue;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* The workhorse of managing page requests.
*
* A controller whose life focuses around one invalidation cycle of a data set, and therefore has
* a fixed size throughout. It assumes that all interface methods are called on a single thread,
* which allows it to keep track of pending requests in a thread-safe way, while spinning off
* tasks to fetch data on its own executor.
*/
class FixedSizePagingController<E> implements PagingController {
private static final String TAG = FixedSizePagingController.class.getSimpleName();
private static final Executor FETCH_EXECUTOR = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingLifoQueue<>(), r -> new Thread(r, "signal-FixedSizedPagingController"));
private static final boolean DEBUG = false;
private final PagedDataSource<E> dataSource;
private final PagingConfig config;
private final MutableLiveData<List<E>> liveData;
private final DataStatus loadState;
private List<E> data;
private volatile boolean invalidated;
FixedSizePagingController(@NonNull PagedDataSource<E> dataSource,
@NonNull PagingConfig config,
@NonNull MutableLiveData<List<E>> liveData,
int size)
{
this.dataSource = dataSource;
this.config = config;
this.liveData = liveData;
this.loadState = DataStatus.obtain(size);
this.data = new CompressedList<>(loadState.size());
}
/**
* We assume this method is always called on the same thread, so we can read our
* {@code loadState} and construct the parameters of a fetch request. That fetch request can
* then be performed on separate single-thread LIFO executor.
*/
@Override
public void onDataNeededAroundIndex(int aroundIndex) {
if (invalidated) {
Log.w(TAG, buildLog(aroundIndex, "Invalidated! At very beginning."));
return;
}
int leftPageBoundary = (aroundIndex / config.pageSize()) * config.pageSize();
int rightPageBoundary = leftPageBoundary + config.pageSize();
int buffer = config.bufferPages() * config.pageSize();
int leftLoadBoundary = Math.max(0, leftPageBoundary - buffer);
int rightLoadBoundary = Math.min(loadState.size(), rightPageBoundary + buffer);
int loadStart = loadState.getEarliestUnmarkedIndexInRange(leftLoadBoundary, rightLoadBoundary);
if (loadStart < 0) {
if (DEBUG) Log.i(TAG, buildLog(aroundIndex, "loadStart < 0"));
return;
}
int loadEnd = loadState.getLatestUnmarkedIndexInRange(Math.max(leftLoadBoundary, loadStart), rightLoadBoundary) + 1;
if (loadEnd <= loadStart) {
if (DEBUG) Log.i(TAG, buildLog(aroundIndex, "loadEnd <= loadStart, loadEnd: " + loadEnd + ", loadStart: " + loadStart));
return;
}
int totalSize = loadState.size();
loadState.markRange(loadStart, loadEnd);
if (DEBUG) Log.i(TAG, buildLog(aroundIndex, "start: " + loadStart + ", end: " + loadEnd + ", totalSize: " + totalSize));
FETCH_EXECUTOR.execute(() -> {
if (invalidated) {
Log.w(TAG, buildLog(aroundIndex, "Invalidated! At beginning of load task."));
return;
}
List<E> loaded = dataSource.load(loadStart, loadEnd - loadStart, () -> invalidated);
if (invalidated) {
Log.w(TAG, buildLog(aroundIndex, "Invalidated! Just after data was loaded."));
return;
}
List<E> updated = new CompressedList<>(data);
for (int i = 0, len = Math.min(loaded.size(), data.size() - loadStart); i < len; i++) {
updated.set(loadStart + i, loaded.get(i));
}
data = updated;
liveData.postValue(updated);
});
}
@Override
public void onDataInvalidated() {
if (invalidated) {
return;
}
invalidated = true;
loadState.recycle();
}
private static String buildLog(int aroundIndex, String message) {
return "onDataNeededAroundIndex(" + aroundIndex + ") " + message;
}
}

View File

@@ -0,0 +1,40 @@
package org.signal.paging;
import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;
import java.util.List;
/**
* The primary entry point for creating paged data.
*/
public final class PagedData<E> {
private final LiveData<List<E>> data;
private final PagingController controller;
@AnyThread
public static <E> PagedData<E> create(@NonNull PagedDataSource<E> dataSource, @NonNull PagingConfig config) {
MutableLiveData<List<E>> liveData = new MutableLiveData<>();
PagingController controller = new BufferedPagingController<>(dataSource, config, liveData);
return new PagedData<>(liveData, controller);
}
private PagedData(@NonNull LiveData<List<E>> data, @NonNull PagingController controller) {
this.data = data;
this.controller = controller;
}
@AnyThread
public @NonNull LiveData<List<E>> getData() {
return data;
}
@AnyThread
public @NonNull PagingController getController() {
return controller;
}
}

View File

@@ -0,0 +1,35 @@
package org.signal.paging;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import java.util.List;
/**
* Represents a source of data that can be queried.
*/
public interface PagedDataSource<T> {
/**
* @return The total size of the data set.
*/
@WorkerThread
int size();
/**
* @param start The index of the first item that should be included in your results.
* @param length The total number of items you should return.
* @param cancellationSignal An object that you can check to see if the load operation was canceled.
*
* @return A list of length {@code length} that represents the data starting at {@code start}.
* If you don't have the full range, just populate what you can.
*/
@WorkerThread
@NonNull List<T> load(int start, int length, @NonNull CancellationSignal cancellationSignal);
interface CancellationSignal {
/**
* @return True if the operation has been canceled, otherwise false.
*/
boolean isCanceled();
}
}

View File

@@ -0,0 +1,79 @@
package org.signal.paging;
import androidx.annotation.NonNull;
/**
* Describes various properties of how you'd like paging to be handled.
*/
public final class PagingConfig {
private final int bufferPages;
private final int startIndex;
private final int pageSize;
private PagingConfig(@NonNull Builder builder) {
this.bufferPages = builder.bufferPages;
this.startIndex = builder.startIndex;
this.pageSize = builder.pageSize;
}
/**
* @return How many pages of 'buffer' you want ahead of and behind the active position. i.e. if
* the {@code pageSize()} is 10 and you specify 2 buffer pages, then there will always be
* at least 20 items ahead of and behind the current position.
*/
int bufferPages() {
return bufferPages;
}
/**
* @return How much data to load at a time when paging data.
*/
int pageSize() {
return pageSize;
}
/**
* @return What position to start loading at
*/
int startIndex() {
return startIndex;
}
public static class Builder {
private int bufferPages = 1;
private int startIndex = 0;
private int pageSize = 50;
public @NonNull Builder setBufferPages(int bufferPages) {
if (bufferPages < 1) {
throw new IllegalArgumentException("You must have at least one buffer page! Requested: " + bufferPages);
}
this.bufferPages = bufferPages;
return this;
}
public @NonNull Builder setPageSize(int pageSize) {
if (pageSize < 1) {
throw new IllegalArgumentException("You must have a page size of at least one! Requested: " + pageSize);
}
this.pageSize = pageSize;
return this;
}
public @NonNull Builder setStartIndex(int startIndex) {
if (startIndex < 0) {
throw new IndexOutOfBoundsException("Requested: " + startIndex);
}
this.startIndex = startIndex;
return this;
}
public @NonNull PagingConfig build() {
return new PagingConfig(this);
}
}
}

View File

@@ -0,0 +1,7 @@
package org.signal.paging;
public interface PagingController {
void onDataNeededAroundIndex(int aroundIndex);
void onDataInvalidated();
}

View File

@@ -0,0 +1,36 @@
package org.signal.paging;
import android.util.Log;
import androidx.annotation.Nullable;
/**
* A controller that forwards calls to a secondary, proxied controller. This is useful when you want
* to keep a single, static controller, even when the true controller may be changing due to data
* source changes.
*/
public class ProxyPagingController implements PagingController {
private PagingController proxied;
@Override
public synchronized void onDataNeededAroundIndex(int aroundIndex) {
if (proxied != null) {
proxied.onDataNeededAroundIndex(aroundIndex);
}
}
@Override
public synchronized void onDataInvalidated() {
if (proxied != null) {
proxied.onDataInvalidated();
}
}
/**
* Updates the underlying controller to the one specified.
*/
public synchronized void set(@Nullable PagingController bound) {
this.proxied = bound;
}
}

View File

@@ -0,0 +1,23 @@
package org.signal.paging.util;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingLifoQueue<E> extends LinkedBlockingDeque<E> {
@Override
public void put(E runnable) throws InterruptedException {
super.putFirst(runnable);
}
@Override
public boolean add(E runnable) {
super.addFirst(runnable);
return true;
}
@Override
public boolean offer(E runnable) {
super.addFirst(runnable);
return true;
}
}

View File

@@ -0,0 +1,55 @@
package org.signal.paging.util;
import android.os.Handler;
import android.os.Looper;
import androidx.annotation.NonNull;
import java.util.concurrent.CountDownLatch;
public final class Util {
private static volatile Handler handler;
private Util() {}
public static void runOnMain(final @NonNull Runnable runnable) {
if (isMainThread()) runnable.run();
else getHandler().post(runnable);
}
public static void runOnMainSync(final @NonNull Runnable runnable) {
if (isMainThread()) {
runnable.run();
} else {
final CountDownLatch sync = new CountDownLatch(1);
runOnMain(() -> {
try {
runnable.run();
} finally {
sync.countDown();
}
});
try {
sync.await();
} catch (InterruptedException ie) {
throw new AssertionError(ie);
}
}
}
public static boolean isMainThread() {
return Looper.myLooper() == Looper.getMainLooper();
}
private static Handler getHandler() {
if (handler == null) {
synchronized (Util.class) {
if (handler == null) {
handler = new Handler(Looper.getMainLooper());
}
}
}
return handler;
}
}