Skip to content

Commit

Permalink
Move ExtractorMediaPeriod to new SampleQueue methods
Browse files Browse the repository at this point in the history
This change allows you to enable/disable tracks within which
all samples are key-frames without any re-buffering (e.g. audio,
text and metadata). This effectively reverts V2 back to the
behavior in V1, only this time we're doing it properly. []ly
disabling/enabling, or disabling/enabling whilst paused, no longer
cause samples to get "lost" between the source and renderers.

Note it also becomes really easy to support a few other things,
although support is not exposed in this change:

- Enable/disable video tracks without any re-buffering, by
  changing the toKeyframe argument passed to discardTo to true.
- Retain media in the buffer for some time after it's been played
  (e.g. to support a single back-5s-seek efficiently), by
  subtracting the desired back-buffer time from the value that's
  passed to discardTo.

Issue: #2956
Issue: #2926

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=160128586
  • Loading branch information
ojw28 committed Jun 26, 2017
1 parent b3c6f6f commit 1b71e3b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
* A {@link MediaPeriod} that extracts data using an {@link Extractor}.
*/
/* package */ final class ExtractorMediaPeriod implements MediaPeriod, ExtractorOutput,
Loader.Callback<ExtractorMediaPeriod.ExtractingLoadable>, UpstreamFormatChangedListener {
Loader.Callback<ExtractorMediaPeriod.ExtractingLoadable>, Loader.ReleaseCallback,
UpstreamFormatChangedListener {

/**
* When the source's duration is unknown, it is calculated by adding this value to the largest
Expand Down Expand Up @@ -146,21 +147,27 @@ public void run() {
}

public void release() {
final ExtractorHolder extractorHolder = this.extractorHolder;
loader.release(new Runnable() {
@Override
public void run() {
extractorHolder.release();
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).disable();
}
boolean releasedSynchronously = loader.release(this);
if (!releasedSynchronously) {
// Discard as much as we can synchronously.
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).discardToEnd();
}
});
}
handler.removeCallbacksAndMessages(null);
released = true;
}

@Override
public void onLoaderReleased() {
extractorHolder.release();
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(true);
}
}

@Override
public void prepare(Callback callback, long positionUs) {
this.callback = callback;
Expand All @@ -182,19 +189,21 @@ public TrackGroupArray getTrackGroups() {
public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamFlags,
SampleStream[] streams, boolean[] streamResetFlags, long positionUs) {
Assertions.checkState(prepared);
// Disable old tracks.
int oldEnabledTrackCount = enabledTrackCount;
// Deselect old tracks.
for (int i = 0; i < selections.length; i++) {
if (streams[i] != null && (selections[i] == null || !mayRetainStreamFlags[i])) {
int track = ((SampleStreamImpl) streams[i]).track;
Assertions.checkState(trackEnabledStates[track]);
enabledTrackCount--;
trackEnabledStates[track] = false;
sampleQueues.valueAt(track).disable();
streams[i] = null;
}
}
// Enable new tracks.
boolean selectedNewTracks = false;
// We'll always need to seek if this is a first selection to a non-zero position, or if we're
// making a selection having previously disabled all tracks.
boolean seekRequired = seenFirstTrackSelection ? oldEnabledTrackCount == 0 : positionUs != 0;
// Select new tracks.
for (int i = 0; i < selections.length; i++) {
if (streams[i] == null && selections[i] != null) {
TrackSelection selection = selections[i];
Expand All @@ -206,16 +215,12 @@ public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamF
trackEnabledStates[track] = true;
streams[i] = new SampleStreamImpl(track);
streamResetFlags[i] = true;
selectedNewTracks = true;
}
}
if (!seenFirstTrackSelection) {
// At the time of the first track selection all queues will be enabled, so we need to disable
// any that are no longer required.
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
if (!trackEnabledStates[i]) {
sampleQueues.valueAt(i).disable();
// If there's still a chance of avoiding a seek, try and seek within the sample queue.
if (!seekRequired) {
SampleQueue sampleQueue = sampleQueues.valueAt(i);
sampleQueue.rewind();
seekRequired = !sampleQueue.advanceTo(positionUs, true, true)
&& sampleQueue.getReadIndex() != 0;
}
}
}
Expand All @@ -224,7 +229,7 @@ public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamF
if (loader.isLoading()) {
loader.cancelLoading();
}
} else if (seenFirstTrackSelection ? selectedNewTracks : positionUs != 0) {
} else if (seekRequired) {
positionUs = seekToUs(positionUs);
// We'll need to reset renderers consuming from all streams due to the seek.
for (int i = 0; i < streams.length; i++) {
Expand All @@ -239,7 +244,10 @@ public long selectTracks(TrackSelection[] selections, boolean[] mayRetainStreamF

@Override
public void discardBuffer(long positionUs) {
// Do nothing.
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).discardTo(positionUs, false, trackEnabledStates[i]);
}
}

@Override
Expand Down Expand Up @@ -303,9 +311,13 @@ public long seekToUs(long positionUs) {
// If we're not pending a reset, see if we can seek within the sample queues.
boolean seekInsideBuffer = !isPendingReset();
for (int i = 0; seekInsideBuffer && i < trackCount; i++) {
if (trackEnabledStates[i]) {
seekInsideBuffer = sampleQueues.valueAt(i).skipToKeyframeBefore(positionUs, false);
}
SampleQueue sampleQueue = sampleQueues.valueAt(i);
sampleQueue.rewind();
// TODO: For sparse tracks (e.g. text, metadata) this may return false when an in-buffer
// seek should be allowed. If there are non-sparse tracks (e.g. video, audio) for which
// in-buffer seeking is successful, we should perform an in-buffer seek unconditionally.
seekInsideBuffer = sampleQueue.advanceTo(positionUs, true, false);
sampleQueue.discardToRead();
}
// If we failed to seek within the sample queues, we need to restart.
if (!seekInsideBuffer) {
Expand Down Expand Up @@ -338,17 +350,16 @@ public long seekToUs(long positionUs) {
if (notifyReset || isPendingReset()) {
return C.RESULT_NOTHING_READ;
}

return sampleQueues.valueAt(track).readData(formatHolder, buffer, formatRequired,
return sampleQueues.valueAt(track).read(formatHolder, buffer, formatRequired,
loadingFinished, lastSeekPositionUs);
}

/* package */ void skipData(int track, long positionUs) {
SampleQueue sampleQueue = sampleQueues.valueAt(track);
if (loadingFinished && positionUs > sampleQueue.getLargestQueuedTimestampUs()) {
sampleQueue.skipAll();
sampleQueue.advanceToEnd();
} else {
sampleQueue.skipToKeyframeBefore(positionUs, true);
sampleQueue.advanceTo(positionUs, true, true);
}
}

Expand All @@ -372,12 +383,15 @@ public void onLoadCompleted(ExtractingLoadable loadable, long elapsedRealtimeMs,
@Override
public void onLoadCanceled(ExtractingLoadable loadable, long elapsedRealtimeMs,
long loadDurationMs, boolean released) {
if (released) {
return;
}
copyLengthFromLoader(loadable);
if (!released && enabledTrackCount > 0) {
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(trackEnabledStates[i]);
}
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(true);
}
if (enabledTrackCount > 0) {
callback.onContinueLoadingRequested(this);
}
}
Expand Down Expand Up @@ -508,7 +522,7 @@ private void configureRetry(ExtractingLoadable loadable) {
notifyReset = prepared;
int trackCount = sampleQueues.size();
for (int i = 0; i < trackCount; i++) {
sampleQueues.valueAt(i).reset(!prepared || trackEnabledStates[i]);
sampleQueues.valueAt(i).reset(true);
}
loadable.setLoadPosition(0, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,23 @@ public interface Callback<T extends Loadable> {

}

/**
* A callback to be notified when a {@link Loader} has finished being released.
*/
public interface ReleaseCallback {

/**
* Called when the {@link Loader} has finished being released.
*/
void onLoaderReleased();

}

public static final int RETRY = 0;
public static final int RETRY_RESET_ERROR_COUNT = 1;
public static final int DONT_RETRY = 2;
public static final int DONT_RETRY_FATAL = 3;

private static final int MSG_START = 0;
private static final int MSG_CANCEL = 1;
private static final int MSG_END_OF_SOURCE = 2;
private static final int MSG_IO_EXCEPTION = 3;
private static final int MSG_FATAL_ERROR = 4;

private final ExecutorService downloadExecutorService;

private LoadTask<? extends Loadable> currentTask;
Expand All @@ -150,7 +156,7 @@ public Loader(String threadName) {
*
* @param <T> The type of the loadable.
* @param loadable The {@link Loadable} to load.
* @param callback A callback to called when the load ends.
* @param callback A callback to be called when the load ends.
* @param defaultMinRetryCount The minimum number of times the load must be retried before
* {@link #maybeThrowError()} will propagate an error.
* @throws IllegalStateException If the calling thread does not have an associated {@link Looper}.
Expand Down Expand Up @@ -188,20 +194,28 @@ public void release() {
}

/**
* Releases the {@link Loader}, running {@code postLoadAction} on its thread. This method should
* be called when the {@link Loader} is no longer required.
* Releases the {@link Loader}. This method should be called when the {@link Loader} is no longer
* required.
*
* @param postLoadAction A {@link Runnable} to run on the loader's thread when
* {@link Loadable#load()} is no longer running.
* @param callback A callback to be called when the release ends. Will be called synchronously
* from this method if no load is in progress, or asynchronously once the load has been
* canceled otherwise. May be null.
* @return True if {@code callback} was called synchronously. False if it will be called
* asynchronously or if {@code callback} is null.
*/
public void release(Runnable postLoadAction) {
public boolean release(ReleaseCallback callback) {
boolean callbackInvoked = false;
if (currentTask != null) {
currentTask.cancel(true);
}
if (postLoadAction != null) {
downloadExecutorService.execute(postLoadAction);
if (callback != null) {
downloadExecutorService.execute(new ReleaseTask(callback));
}
} else if (callback != null) {
callback.onLoaderReleased();
callbackInvoked = true;
}
downloadExecutorService.shutdown();
return callbackInvoked;
}

// LoaderErrorThrower implementation.
Expand All @@ -228,6 +242,12 @@ private final class LoadTask<T extends Loadable> extends Handler implements Runn

private static final String TAG = "LoadTask";

private static final int MSG_START = 0;
private static final int MSG_CANCEL = 1;
private static final int MSG_END_OF_SOURCE = 2;
private static final int MSG_IO_EXCEPTION = 3;
private static final int MSG_FATAL_ERROR = 4;

private final T loadable;
private final Loader.Callback<T> callback;
public final int defaultMinRetryCount;
Expand Down Expand Up @@ -390,4 +410,24 @@ private long getRetryDelayMillis() {

}

private static final class ReleaseTask extends Handler implements Runnable {

private final ReleaseCallback callback;

public ReleaseTask(ReleaseCallback callback) {
this.callback = callback;
}

@Override
public void run() {
sendEmptyMessage(0);
}

@Override
public void handleMessage(Message msg) {
callback.onLoaderReleased();
}

}

}

0 comments on commit 1b71e3b

Please sign in to comment.