diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e7a951e8..a61eb541 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -31,7 +31,7 @@ jobs: strategy: fail-fast: false matrix: - EMULATOR_API: [23, 25, 29] + EMULATOR_API: [24, 27, 29, 31, 34] steps: - uses: actions/checkout@v4 - uses: actions/setup-java@v4 @@ -54,10 +54,18 @@ jobs: arch: x86_64 profile: Nexus 6 emulator-options: -no-snapshot -no-window -no-boot-anim -camera-back none -camera-front none -gpu swiftshader_indirect - script: ./.github/workflows/emulator_script.sh + script: ./.github/workflows/emulator_script.sh logcat_${{ matrix.EMULATOR_API }}.txt + + - name: Upload emulator logs + uses: actions/upload-artifact@v4 + if: always() + with: + name: emulator_logs_${{ matrix.EMULATOR_API }} + path: ./logcat_${{ matrix.EMULATOR_API }}.txt - name: Upload emulator tests artifact uses: actions/upload-artifact@v4 + if: always() with: name: emulator_tests_${{ matrix.EMULATOR_API }} path: ./lib/build/reports/androidTests/connected/debug/ \ No newline at end of file diff --git a/.github/workflows/emulator_script.sh b/.github/workflows/emulator_script.sh index af4ea4bb..58a57fb1 100755 --- a/.github/workflows/emulator_script.sh +++ b/.github/workflows/emulator_script.sh @@ -1,8 +1,6 @@ #!/usr/bin/env bash -ADB_TAGS="Transcoder:I Engine:I" -ADB_TAGS="$ADB_TAGS DefaultVideoStrategy:I DefaultAudioStrategy:I" -ADB_TAGS="$ADB_TAGS VideoDecoderOutput:I VideoFrameDropper:I" -ADB_TAGS="$ADB_TAGS AudioEngine:I" adb logcat -c -adb logcat $ADB_TAGS *:E -v color & +adb logcat *:V > "$1" & +LOGCAT_PID=$! +trap "kill $LOGCAT_PID" EXIT ./gradlew lib:connectedCheck --stacktrace \ No newline at end of file diff --git a/lib/src/androidTest/assets/issue_102/sample.mp4 b/lib/src/androidTest/assets/issue_102/sample.mp4 new file mode 100644 index 00000000..8ce533f4 Binary files /dev/null and b/lib/src/androidTest/assets/issue_102/sample.mp4 differ diff --git a/lib/src/androidTest/assets/issue_184/transcode.3gp b/lib/src/androidTest/assets/issue_184/transcode.3gp new file mode 100644 index 00000000..71a06092 Binary files /dev/null and b/lib/src/androidTest/assets/issue_184/transcode.3gp differ diff --git a/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt b/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt index 68cf479b..6d127813 100644 --- a/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt +++ b/lib/src/androidTest/java/com/otaliastudios/transcoder/integration/IssuesTests.kt @@ -1,5 +1,6 @@ package com.otaliastudios.transcoder.integration +import android.media.MediaFormat import android.media.MediaMetadataRetriever.METADATA_KEY_DURATION import android.media.MediaMetadataRetriever import androidx.test.ext.junit.runners.AndroidJUnit4 @@ -7,10 +8,15 @@ import androidx.test.platform.app.InstrumentationRegistry import com.otaliastudios.transcoder.Transcoder import com.otaliastudios.transcoder.TranscoderListener import com.otaliastudios.transcoder.TranscoderOptions +import com.otaliastudios.transcoder.common.TrackType import com.otaliastudios.transcoder.internal.utils.Logger import com.otaliastudios.transcoder.source.AssetFileDescriptorDataSource import com.otaliastudios.transcoder.source.ClipDataSource import com.otaliastudios.transcoder.source.FileDescriptorDataSource +import com.otaliastudios.transcoder.strategy.DefaultVideoStrategy +import com.otaliastudios.transcoder.validator.WriteAlwaysValidator +import org.junit.Assume +import org.junit.AssumptionViolatedException import org.junit.Test import org.junit.runner.RunWith import java.io.File @@ -24,20 +30,21 @@ class IssuesTests { val context = InstrumentationRegistry.getInstrumentation().context fun output( - name: String = System.currentTimeMillis().toString(), - extension: String = "mp4" + name: String = System.currentTimeMillis().toString(), + extension: String = "mp4" ) = File(context.cacheDir, "$name.$extension").also { it.parentFile!!.mkdirs() } fun input(filename: String) = AssetFileDescriptorDataSource( - context.assets.openFd("issue_$issue/$filename") + context.assets.openFd("issue_$issue/$filename") ) fun transcode( - output: File = output(), - assertTranscoded: Boolean = true, - assertDuration: Boolean = true, - builder: TranscoderOptions.Builder.() -> Unit, - ): File { + output: File = output(), + assertTranscoded: Boolean = true, + assertDuration: Boolean = true, + builder: TranscoderOptions.Builder.() -> Unit, + ): File = runCatching { + Logger.setLogLevel(Logger.LEVEL_VERBOSE) val transcoder = Transcoder.into(output.absolutePath) transcoder.apply(builder) transcoder.setListener(object : TranscoderListener { @@ -60,11 +67,17 @@ class IssuesTests { retriever.release() } return output + }.getOrElse { + if (it.toString().contains("c2.android.avc.encoder was unable to create the input surface (1x1)")) { + log.w("Hit known emulator bug. Skipping the test.") + throw AssumptionViolatedException("Hit known emulator bug.") + } + throw it } } - @Test + @Test(timeout = 16000) fun issue137() = with(Helper(137)) { transcode { addDataSource(ClipDataSource(input("main.mp3"), 0L, 1000_000L)) @@ -88,4 +101,22 @@ class IssuesTests { } Unit } + + @Test(timeout = 16000) + fun issue184() = with(Helper(184)) { + transcode { + addDataSource(TrackType.VIDEO, input("transcode.3gp")) + setVideoTrackStrategy(DefaultVideoStrategy.exact(400, 400).build()) + } + Unit + } + + @Test(timeout = 16000) + fun issue102() = with(Helper(102)) { + transcode { + addDataSource(input("sample.mp4")) + setValidator(WriteAlwaysValidator()) + } + Unit + } } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/common/TrackType.kt b/lib/src/main/java/com/otaliastudios/transcoder/common/TrackType.kt index 2dbbf7d1..8fb4e1cc 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/common/TrackType.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/common/TrackType.kt @@ -2,8 +2,9 @@ package com.otaliastudios.transcoder.common import android.media.MediaFormat -enum class TrackType { - AUDIO, VIDEO +enum class TrackType(internal val displayName: String) { + AUDIO("Audio"), VIDEO("Video"); + } internal val MediaFormat.trackType get() = requireNotNull(trackTypeOrNull) { diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt index 9a138acb..f3c9724a 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Codecs.kt @@ -1,16 +1,18 @@ package com.otaliastudios.transcoder.internal import android.media.MediaCodec +import android.media.MediaCodecList import android.media.MediaFormat -import android.view.Surface +import android.opengl.EGL14 +import com.otaliastudios.opengl.core.EglCore +import com.otaliastudios.opengl.surface.EglWindowSurface import com.otaliastudios.transcoder.common.TrackStatus import com.otaliastudios.transcoder.common.TrackType -import com.otaliastudios.transcoder.internal.media.MediaFormatProvider +import com.otaliastudios.transcoder.internal.media.MediaFormatConstants import com.otaliastudios.transcoder.internal.utils.Logger import com.otaliastudios.transcoder.internal.utils.TrackMap -import com.otaliastudios.transcoder.internal.utils.trackMapOf -import com.otaliastudios.transcoder.source.DataSource -import com.otaliastudios.transcoder.strategy.TrackStrategy +import java.nio.ByteBuffer +import kotlin.properties.Delegates.observable /** * Encoders are shared between segments. This is not strictly needed but it is more efficient @@ -25,9 +27,52 @@ internal class Codecs( private val current: TrackMap ) { + class Surface( + private val context: EglCore, + val window: EglWindowSurface, + ) { + fun release() { + window.release() + context.release() + } + } + + class Codec(val codec: MediaCodec, val surface: Surface? = null, var log: Logger? = null) { + var dequeuedInputs by observable(0) { _, _, _ -> log?.v(state) } + var dequeuedOutputs by observable(0) { _, _, _ -> log?.v(state) } + val state get(): String = "dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs heldInputs=${heldInputs.size}" + + private val heldInputs = ArrayDeque>() + + fun getInputBuffer(): Pair? { + if (heldInputs.isNotEmpty()) { + return heldInputs.removeFirst().also { log?.v(state) } + } + val id = codec.dequeueInputBuffer(100) + return if (id >= 0) { + dequeuedInputs++ + val buf = checkNotNull(codec.getInputBuffer(id)) { "inputBuffer($id) should not be null." } + buf to id + } else { + log?.i("buffer() failed with $id. $state") + null + } + } + + /** + * When we're not ready to write into this buffer, it can be held for later. + * Previously we were returning it to the codec with timestamp=0, flags=0, but especially + * on older Android versions that can create subtle issues. + * It's better to just keep the buffer here and reuse it on the next [getInputBuffer] call. + */ + fun holdInputBuffer(buffer: ByteBuffer, id: Int) { + heldInputs.addLast(buffer to id) + } + } + private val log = Logger("Codecs") - val encoders = object : TrackMap> { + val encoders = object : TrackMap { override fun has(type: TrackType) = tracks.all[type] == TrackStatus.COMPRESSING @@ -35,14 +80,45 @@ internal class Codecs( val format = tracks.outputFormats.audio val codec = MediaCodec.createEncoderByType(format.getString(MediaFormat.KEY_MIME)!!) codec.configure(format, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE) - codec to null + Codec(codec, null) } private val lazyVideo by lazy { val format = tracks.outputFormats.video + val width = format.getInteger(MediaFormat.KEY_WIDTH) + val height = format.getInteger(MediaFormat.KEY_HEIGHT) + log.i("Destination video surface size: ${width}x${height} @ ${format.getInteger(MediaFormatConstants.KEY_ROTATION_DEGREES)}") + log.i("Destination video format: $format") + + val allCodecs = MediaCodecList(MediaCodecList.REGULAR_CODECS) + val videoEncoders = allCodecs.codecInfos.filter { it.isEncoder && it.supportedTypes.any { it.startsWith("video/") } } + log.i("Available encoders: ${videoEncoders.joinToString { "${it.name} (${it.supportedTypes.joinToString()})" }}") + + // Could consider MediaCodecList(MediaCodecList.REGULAR_CODECS).findEncoderForFormat(format) + // But it's trickier, for example, format should not include frame rate on API 21 and maybe other quirks. val codec = MediaCodec.createEncoderByType(format.getString(MediaFormat.KEY_MIME)!!) codec.configure(format, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE) - codec to codec.createInputSurface() + log.i("Selected encoder ${codec.name}") + val surface = codec.createInputSurface() + + log.i("Creating OpenGL context on ${Thread.currentThread()} (${surface.isValid})") + val eglContext = EglCore(EGL14.EGL_NO_CONTEXT, EglCore.FLAG_RECORDABLE) + val eglWindow = EglWindowSurface(eglContext, surface, true) + eglWindow.makeCurrent() + + // On API28 (possibly others) emulator, this happens. If we don't throw early, it fails later with unclear + // errors - a tombstone dump saying that src.width() & 1 == 0 (basically, complains that surface size is odd) + // and an error much later on during encoder's dequeue. Surface size is odd because it's 1x1. + val (eglWidth, eglHeight) = eglWindow.getWidth() to eglWindow.getHeight() + if (eglWidth != width || eglHeight != height) { + log.e("OpenGL surface has wrong size (expected: ${width}x${height}, found: ${eglWindow.getWidth()}x${eglWindow.getHeight()}).") + // Throw a clear error in this very specific scenario so we can catch it in tests. + if (codec.name == "c2.android.avc.encoder" && eglWidth == 1 && eglHeight == 1) { + error("c2.android.avc.encoder was unable to create the input surface (1x1).") + } + } + + Codec(codec, Surface(eglContext, eglWindow)) } override fun get(type: TrackType) = when (type) { @@ -63,7 +139,7 @@ internal class Codecs( fun release() { encoders.forEach { - it.first.release() + it.surface?.release() } } } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt index 50018ada..74adba3e 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segment.kt @@ -3,7 +3,6 @@ package com.otaliastudios.transcoder.internal import com.otaliastudios.transcoder.common.TrackType import com.otaliastudios.transcoder.internal.pipeline.Pipeline import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger internal class Segment( val type: TrackType, @@ -11,7 +10,7 @@ internal class Segment( private val pipeline: Pipeline, ) { - private val log = Logger("Segment($type,$index)") + // private val log = Logger("Segment($type,$index)") private var state: State? = null fun advance(): Boolean { @@ -20,15 +19,14 @@ internal class Segment( } fun canAdvance(): Boolean { - log.v("canAdvance(): state=$state") + // log.v("canAdvance(): state=$state") return state == null || state !is State.Eos } fun needsSleep(): Boolean { when(val s = state ?: return false) { is State.Ok -> return false - is State.Retry -> return false - is State.Wait -> return s.sleep + is State.Failure -> return s.sleep } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt index bdcc31a1..fc666547 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Segments.kt @@ -9,9 +9,9 @@ import com.otaliastudios.transcoder.internal.utils.TrackMap import com.otaliastudios.transcoder.internal.utils.mutableTrackMapOf internal class Segments( - private val sources: DataSources, - private val tracks: Tracks, - private val factory: (TrackType, Int, TrackStatus, MediaFormat) -> Pipeline + private val sources: DataSources, + private val tracks: Tracks, + private val factory: (TrackType, Int, Int, TrackStatus, MediaFormat) -> Pipeline ) { private val log = Logger("Segments") @@ -21,7 +21,7 @@ internal class Segments( fun hasNext(type: TrackType): Boolean { if (!sources.has(type)) return false - log.v("hasNext($type): segment=${current.getOrNull(type)} lastIndex=${sources.getOrNull(type)?.lastIndex} canAdvance=${current.getOrNull(type)?.canAdvance()}") + // log.v("hasNext($type): segment=${current.getOrNull(type)} lastIndex=${sources.getOrNull(type)?.lastIndex} canAdvance=${current.getOrNull(type)?.canAdvance()}") val segment = current.getOrNull(type) ?: return true // not started val lastIndex = sources.getOrNull(type)?.lastIndex ?: return false // no track! return segment.canAdvance() || segment.index < lastIndex @@ -85,15 +85,16 @@ internal class Segments( // who check it during pipeline init. currentIndex[type] = index val pipeline = factory( - type, - index, - tracks.all[type], - tracks.outputFormats[type] + type, + index, + sources[type].size, + tracks.all[type], + tracks.outputFormats[type] ) return Segment( - type = type, - index = index, - pipeline = pipeline + type = type, + index = index, + pipeline = pipeline ).also { current[type] = it } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/Timer.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/Timer.kt index 9d9b822e..f3030e7f 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/Timer.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/Timer.kt @@ -7,10 +7,10 @@ import com.otaliastudios.transcoder.source.DataSource import com.otaliastudios.transcoder.time.TimeInterpolator internal class Timer( - private val interpolator: TimeInterpolator, - private val sources: DataSources, - private val tracks: Tracks, - private val current: TrackMap + private val interpolator: TimeInterpolator, + private val sources: DataSources, + private val tracks: Tracks, + private val current: TrackMap ) { private val log = Logger("Timer") @@ -55,7 +55,7 @@ internal class Timer( } } - private val interpolators = mutableMapOf, TimeInterpolator>() + private val interpolators = mutableMapOf, SegmentInterpolator>() fun localize(type: TrackType, index: Int, positionUs: Long): Long? { if (!tracks.active.has(type)) return null @@ -68,27 +68,40 @@ internal class Timer( return localizedUs } - fun interpolator(type: TrackType, index: Int) = interpolators.getOrPut(type to index) { - object : TimeInterpolator { + fun interpolator(type: TrackType, index: Int): SegmentInterpolator = interpolators.getOrPut(type to index) { + SegmentInterpolator( + log = Logger("${type.displayName}Interpolator$index/${sources[type].size}"), + user = interpolator, + previous = if (index == 0) null else interpolator(type, index - 1) + ) + } + + class SegmentInterpolator( + private val log: Logger, + private val user: TimeInterpolator, + previous: SegmentInterpolator?, + ) : TimeInterpolator { - private var lastOut = 0L - private var firstIn = Long.MAX_VALUE - private val firstOut = when { - index == 0 -> 0L - else -> { - // Add 10 just so they're not identical. - val previous = interpolators[type to index - 1]!! - previous.interpolate(type, Long.MAX_VALUE) + 10L - } + private var inputBase = Long.MIN_VALUE + private var interpolatedLast = Long.MIN_VALUE + private var outputLast = Long.MIN_VALUE + private val outputBase by lazy { + when (previous) { + null -> 0L + // Not interpolated by user, so we give user interpolator a consistent stream. + // Add a bit of distance just so they're not identical, won't be noticeable. + else -> previous.outputLast + 1L + }.also { + log.i("Found output base timestamp: $it") } + } - override fun interpolate(type: TrackType, time: Long) = when (time) { - Long.MAX_VALUE -> lastOut - else -> { - if (firstIn == Long.MAX_VALUE) firstIn = time - lastOut = firstOut + (time - firstIn) - interpolator.interpolate(type, lastOut) - } + override fun interpolate(type: TrackType, time: Long): Long { + if (inputBase == Long.MIN_VALUE) inputBase = time + outputLast = outputBase + (time - inputBase) + return user.interpolate(type, outputLast).also { + check(it > interpolatedLast) { "Timestamps must be monotonically increasing: $it, $interpolatedLast" } + interpolatedLast = it } } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt index 83cf20bb..33e03c5d 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/AudioEngine.kt @@ -6,11 +6,8 @@ import android.view.Surface import com.otaliastudios.transcoder.internal.audio.remix.AudioRemixer import com.otaliastudios.transcoder.internal.codec.* import com.otaliastudios.transcoder.internal.pipeline.* -import com.otaliastudios.transcoder.internal.utils.Logger -import com.otaliastudios.transcoder.internal.utils.trackMapOf import com.otaliastudios.transcoder.resample.AudioResampler import com.otaliastudios.transcoder.stretch.AudioStretcher -import java.util.concurrent.atomic.AtomicInteger import kotlin.math.ceil import kotlin.math.floor @@ -19,15 +16,10 @@ import kotlin.math.floor * remixing, stretching. TODO: With some extra work this could be split in different steps. */ internal class AudioEngine( - private val stretcher: AudioStretcher, - private val resampler: AudioResampler, - private val targetFormat: MediaFormat -): QueuedStep(), DecoderChannel { - - companion object { - private val ID = AtomicInteger(0) - } - private val log = Logger("AudioEngine(${ID.getAndIncrement()})") + private val stretcher: AudioStretcher, + private val resampler: AudioResampler, + private val targetFormat: MediaFormat +): QueuedStep("AudioEngine"), DecoderChannel { override val channel = this private val buffers = ShortBuffers() @@ -35,8 +27,9 @@ internal class AudioEngine( private val MediaFormat.sampleRate get() = getInteger(KEY_SAMPLE_RATE) private val MediaFormat.channels get() = getInteger(KEY_CHANNEL_COUNT) + private val chunks = ChunkQueue(log) + private var readyToDrain = false private lateinit var rawFormat: MediaFormat - private lateinit var chunks: ChunkQueue private lateinit var remixer: AudioRemixer override fun handleSourceFormat(sourceFormat: MediaFormat): Surface? = null @@ -44,37 +37,40 @@ internal class AudioEngine( override fun handleRawFormat(rawFormat: MediaFormat) { log.i("handleRawFormat($rawFormat)") this.rawFormat = rawFormat - remixer = AudioRemixer[rawFormat.channels, targetFormat.channels] - chunks = ChunkQueue(rawFormat.sampleRate, rawFormat.channels) + this.remixer = AudioRemixer[rawFormat.channels, targetFormat.channels] + this.readyToDrain = true } override fun enqueueEos(data: DecoderData) { - log.i("enqueueEos()") + log.i("enqueueEos (${chunks.size} in queue)") data.release(false) chunks.enqueueEos() } override fun enqueue(data: DecoderData) { val stretch = (data as? DecoderTimerData)?.timeStretch ?: 1.0 - chunks.enqueue(data.buffer.asShortBuffer(), data.timeUs, stretch) { - data.release(false) - } + chunks.enqueue(data.buffer.asShortBuffer(), data.timeUs, stretch) { data.release(false) } } override fun drain(): State { + if (!readyToDrain) { + log.i("drain(): not ready, waiting... (${chunks.size} in queue)") + return State.Retry(false) + } if (chunks.isEmpty()) { // nothing was enqueued log.i("drain(): no chunks, waiting...") - return State.Wait(false) + return State.Retry(false) } val (outBytes, outId) = next.buffer() ?: return run { // dequeueInputBuffer failed - log.i("drain(): no next buffer, waiting...") - State.Wait(true) + log.i("drain(): no next buffer, waiting... (${chunks.size} in queue)") + State.Retry(true) } val outBuffer = outBytes.asShortBuffer() return chunks.drain( - eos = State.Eos(EncoderData(outBytes, outId, 0)) + eos = State.Eos(EncoderData(outBytes, outId, 0)), + format = rawFormat ) { inBuffer, timeUs, stretch -> val outSize = outBuffer.remaining() val inSize = inBuffer.remaining() @@ -105,15 +101,17 @@ internal class AudioEngine( // Resample resampler.resample( - remixBuffer, rawFormat.sampleRate, - outBuffer, targetFormat.sampleRate, - targetFormat.channels) + remixBuffer, rawFormat.sampleRate, + outBuffer, targetFormat.sampleRate, + targetFormat.channels + ) outBuffer.flip() // Adjust position and dispatch. outBytes.clear() outBytes.limit(outBuffer.limit() * BYTES_PER_SHORT) outBytes.position(outBuffer.position() * BYTES_PER_SHORT) + log.v("drain(): passing buffer $outId to encoder... ${chunks.size} in queue") State.Ok(EncoderData(outBytes, outId, timeUs)) } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt index b8a641bf..cf8008a3 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/audio/chunks.kt @@ -1,5 +1,10 @@ package com.otaliastudios.transcoder.internal.audio +import android.media.MediaFormat +import android.media.MediaFormat.KEY_CHANNEL_COUNT +import android.media.MediaFormat.KEY_SAMPLE_RATE +import com.otaliastudios.transcoder.internal.utils.Logger +import java.nio.ByteBuffer import java.nio.ShortBuffer private data class Chunk( @@ -19,15 +24,24 @@ private data class Chunk( * big enough to contain the full processed size, in which case we want to consume only * part of the input buffer and keep it available for the next cycle. */ -internal class ChunkQueue(private val sampleRate: Int, private val channels: Int) { +internal class ChunkQueue(private val log: Logger) { private val queue = ArrayDeque() + private val pool = ShortBufferPool() fun isEmpty() = queue.isEmpty() + val size get() = queue.size fun enqueue(buffer: ShortBuffer, timeUs: Long, timeStretch: Double, release: () -> Unit) { if (buffer.hasRemaining()) { - queue.addLast(Chunk(buffer, timeUs, timeStretch, release)) + if (queue.size >= 3) { + val copy = pool.take(buffer) + queue.addLast(Chunk(copy, timeUs, timeStretch, { pool.give(copy) })) + release() + } else { + queue.addLast(Chunk(buffer, timeUs, timeStretch, release)) + } } else { + log.w("enqueued invalid buffer ($timeUs, ${buffer.capacity()})") release() } } @@ -36,7 +50,7 @@ internal class ChunkQueue(private val sampleRate: Int, private val channels: Int queue.addLast(Chunk.Eos) } - fun drain(eos: T, action: (buffer: ShortBuffer, timeUs: Long, timeStretch: Double) -> T): T { + fun drain(format: MediaFormat, eos: T, action: (buffer: ShortBuffer, timeUs: Long, timeStretch: Double) -> T): T { val head = queue.removeFirst() if (head === Chunk.Eos) return eos @@ -46,14 +60,48 @@ internal class ChunkQueue(private val sampleRate: Int, private val channels: Int // Action can reduce the limit for any reason. Restore it before comparing sizes. head.buffer.limit(limit) if (head.buffer.hasRemaining()) { + // We could technically hold onto the same chunk, but in practice it's better to + // release input buffers back to the decoder otherwise it can get stuck val consumed = size - head.buffer.remaining() + val sampleRate = format.getInteger(KEY_SAMPLE_RATE) + val channelCount = format.getInteger(KEY_CHANNEL_COUNT) + val buffer = pool.take(head.buffer) + head.release() queue.addFirst(head.copy( - timeUs = shortsToUs(consumed, sampleRate, channels) + timeUs = shortsToUs(consumed, sampleRate, channelCount), + release = { pool.give(buffer) }, + buffer = buffer )) + log.v("drain(): partially handled chunk at ${head.timeUs}us, ${head.buffer.remaining()} bytes left (${queue.size})") } else { // buffer consumed! + log.v("drain(): consumed chunk at ${head.timeUs}us (${queue.size + 1} => ${queue.size})") head.release() } return result } } + + +class ShortBufferPool { + private val pool = mutableListOf() + + fun take(original: ShortBuffer): ShortBuffer { + val needed = original.remaining() + val index = pool.indexOfFirst { it.capacity() >= needed } + val memory = when { + index >= 0 -> pool.removeAt(index) + else -> ByteBuffer.allocateDirect((needed * Short.SIZE_BYTES).coerceAtLeast(1024)) + .order(original.order()) + .asShortBuffer() + } + memory.put(original) + memory.flip() + return memory + } + + fun give(buffer: ShortBuffer) { + buffer.clear() + pool.add(buffer) + } +} \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt index 532037f6..635536a6 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Decoder.kt @@ -3,26 +3,21 @@ package com.otaliastudios.transcoder.internal.codec import android.media.MediaCodec.* import android.media.MediaFormat import android.view.Surface +import com.otaliastudios.transcoder.common.TrackType import com.otaliastudios.transcoder.common.trackType +import com.otaliastudios.transcoder.internal.Codecs import com.otaliastudios.transcoder.internal.data.ReaderChannel import com.otaliastudios.transcoder.internal.data.ReaderData -import com.otaliastudios.transcoder.internal.media.MediaCodecBuffers -import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.QueuedStep import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger -import com.otaliastudios.transcoder.internal.utils.trackMapOf import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicInteger -import kotlin.properties.Delegates -import kotlin.properties.Delegates.observable internal open class DecoderData( - val buffer: ByteBuffer, - val timeUs: Long, - val release: (render: Boolean) -> Unit + val buffer: ByteBuffer, + val timeUs: Long, + val release: (render: Boolean) -> Unit ) internal interface DecoderChannel : Channel { @@ -31,94 +26,89 @@ internal interface DecoderChannel : Channel { } internal class Decoder( - private val format: MediaFormat, // source.getTrackFormat(track) - continuous: Boolean, // relevant if the source sends no-render chunks. should we compensate or not? -) : QueuedStep(), ReaderChannel { - - companion object { - private val ID = trackMapOf(AtomicInteger(0), AtomicInteger(0)) + private val format: MediaFormat, // source.getTrackFormat(track) + continuous: Boolean, // relevant if the source sends no-render chunks. should we compensate or not? +) : QueuedStep( + when (format.trackType) { + TrackType.VIDEO -> "VideoDecoder" + TrackType.AUDIO -> "AudioDecoder" } +), ReaderChannel { - private val log = Logger("Decoder(${format.trackType},${ID[format.trackType].getAndIncrement()})") override val channel = this - private val codec = createDecoderByType(format.getString(MediaFormat.KEY_MIME)!!) - private val buffers by lazy { MediaCodecBuffers(codec) } + init { + log.i("init: instantiating codec...") + } + private val decoder = Codecs.Codec(createDecoderByType(format.getString(MediaFormat.KEY_MIME)!!), null, log) private var info = BufferInfo() private val dropper = DecoderDropper(continuous) - private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() } - private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() } - private fun printDequeued() { - // log.v("dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") - } + private var surfaceRendering = false + private val surfaceRenderingDummyBuffer = ByteBuffer.allocateDirect(0) override fun initialize(next: DecoderChannel) { super.initialize(next) log.i("initialize()") val surface = next.handleSourceFormat(format) - codec.configure(format, surface, null, 0) - codec.start() + surfaceRendering = surface != null + decoder.codec.configure(format, surface, null, 0) + decoder.codec.start() } - override fun buffer(): Pair? { - val id = codec.dequeueInputBuffer(100) - return if (id >= 0) { - dequeuedInputs++ - buffers.getInputBuffer(id) to id - } else { - log.i("buffer() failed. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") - null - } - } + override fun buffer(): Pair? = decoder.getInputBuffer() override fun enqueueEos(data: ReaderData) { log.i("enqueueEos()!") - dequeuedInputs-- - val flag = BUFFER_FLAG_END_OF_STREAM - codec.queueInputBuffer(data.id, 0, 0, 0, flag) + decoder.dequeuedInputs-- + decoder.codec.queueInputBuffer(data.id, 0, 0, 0, BUFFER_FLAG_END_OF_STREAM) } override fun enqueue(data: ReaderData) { - dequeuedInputs-- + decoder.dequeuedInputs-- val (chunk, id) = data val flag = if (chunk.keyframe) BUFFER_FLAG_SYNC_FRAME else 0 - codec.queueInputBuffer(id, chunk.buffer.position(), chunk.buffer.remaining(), chunk.timeUs, flag) + log.v("enqueued ${chunk.buffer.remaining()} bytes (${chunk.timeUs}us)") + decoder.codec.queueInputBuffer(id, chunk.buffer.position(), chunk.buffer.remaining(), chunk.timeUs, flag) dropper.input(chunk.timeUs, chunk.render) } override fun drain(): State { - val result = codec.dequeueOutputBuffer(info, 100) + val result = decoder.codec.dequeueOutputBuffer(info, 100) return when (result) { INFO_TRY_AGAIN_LATER -> { log.i("drain(): got INFO_TRY_AGAIN_LATER, waiting.") - State.Wait(true) + State.Retry(true) } INFO_OUTPUT_FORMAT_CHANGED -> { - log.i("drain(): got INFO_OUTPUT_FORMAT_CHANGED, handling format and retrying. format=${codec.outputFormat}") - next.handleRawFormat(codec.outputFormat) - State.Retry + log.i("drain(): got INFO_OUTPUT_FORMAT_CHANGED, handling format and retrying. format=${decoder.codec.outputFormat}") + next.handleRawFormat(decoder.codec.outputFormat) + drain() } INFO_OUTPUT_BUFFERS_CHANGED -> { log.i("drain(): got INFO_OUTPUT_BUFFERS_CHANGED, retrying.") - buffers.onOutputBuffersChanged() - State.Retry + drain() } else -> { val isEos = info.flags and BUFFER_FLAG_END_OF_STREAM != 0 val timeUs = if (isEos) 0 else dropper.output(info.presentationTimeUs) if (timeUs != null /* && (isEos || info.size > 0) */) { - dequeuedOutputs++ - val buffer = buffers.getOutputBuffer(result) + val codecBuffer = decoder.codec.getOutputBuffer(result) + val buffer = when { + codecBuffer != null -> codecBuffer + surfaceRendering -> surfaceRenderingDummyBuffer // happens, at least on API28 emulator + else -> error("outputBuffer($result, ${info.size}, ${info.offset}, ${info.flags}) should not be null.") + } + decoder.dequeuedOutputs++ val data = DecoderData(buffer, timeUs) { - codec.releaseOutputBuffer(result, it) - dequeuedOutputs-- + decoder.codec.releaseOutputBuffer(result, it) + decoder.dequeuedOutputs-- } if (isEos) State.Eos(data) else State.Ok(data) } else { // frame was dropped, no need to sleep - codec.releaseOutputBuffer(result, false) - State.Wait(false) + decoder.codec.releaseOutputBuffer(result, false) + State.Retry(false) }.also { log.v("drain(): returning $it") } @@ -127,8 +117,8 @@ internal class Decoder( } override fun release() { - log.i("release(): releasing codec. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") - codec.stop() - codec.release() + log.i("release: releasing codec. ${decoder.state}") + decoder.codec.stop() + decoder.codec.release() } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/DecoderTimer.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/DecoderTimer.kt index 4c9459a6..6b5adf9c 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/DecoderTimer.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/DecoderTimer.kt @@ -1,7 +1,7 @@ package com.otaliastudios.transcoder.internal.codec import com.otaliastudios.transcoder.common.TrackType -import com.otaliastudios.transcoder.internal.pipeline.DataStep +import com.otaliastudios.transcoder.internal.pipeline.TransformStep import com.otaliastudios.transcoder.internal.pipeline.State import com.otaliastudios.transcoder.time.TimeInterpolator import java.nio.ByteBuffer @@ -15,38 +15,38 @@ internal class DecoderTimerData( ) : DecoderData(buffer, timeUs, release) internal class DecoderTimer( - private val track: TrackType, - private val interpolator: TimeInterpolator, -) : DataStep() { + private val track: TrackType, + private val interpolator: TimeInterpolator, +) : TransformStep("DecoderTimer") { - private var lastTimeUs: Long? = null - private var lastRawTimeUs: Long? = null + private var lastTimeUs: Long = Long.MIN_VALUE + private var lastRawTimeUs: Long = Long.MIN_VALUE - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { if (state is State.Eos) return state require(state.value !is DecoderTimerData) { "Can't apply DecoderTimer twice." } val rawTimeUs = state.value.timeUs val timeUs = interpolator.interpolate(track, rawTimeUs) - val timeStretch = if (lastTimeUs == null) { + val timeStretch = if (lastTimeUs == Long.MIN_VALUE) { 1.0 } else { // TODO to be exact, timeStretch should be computed by comparing the NEXT timestamps // with this, instead of comparing this with the PREVIOUS - val durationUs = timeUs - lastTimeUs!! - val rawDurationUs = rawTimeUs - lastRawTimeUs!! + val durationUs = timeUs - lastTimeUs + val rawDurationUs = rawTimeUs - lastRawTimeUs durationUs.toDouble() / rawDurationUs } lastTimeUs = timeUs lastRawTimeUs = rawTimeUs return State.Ok(DecoderTimerData( - buffer = state.value.buffer, - rawTimeUs = rawTimeUs, - timeUs = timeUs, - timeStretch = timeStretch, - release = state.value.release + buffer = state.value.buffer, + rawTimeUs = rawTimeUs, + timeUs = timeUs, + timeStretch = timeStretch, + release = state.value.release )) } } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt index e1ebae41..38fcfdfc 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/codec/Encoder.kt @@ -1,23 +1,14 @@ package com.otaliastudios.transcoder.internal.codec -import android.media.MediaCodec import android.media.MediaCodec.* -import android.view.Surface import com.otaliastudios.transcoder.common.TrackType -import com.otaliastudios.transcoder.common.trackType import com.otaliastudios.transcoder.internal.Codecs import com.otaliastudios.transcoder.internal.data.WriterChannel import com.otaliastudios.transcoder.internal.data.WriterData -import com.otaliastudios.transcoder.internal.media.MediaCodecBuffers import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.QueuedStep import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger -import com.otaliastudios.transcoder.internal.utils.trackMapOf import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicInteger -import kotlin.properties.Delegates -import kotlin.properties.Delegates.observable internal data class EncoderData( val buffer: ByteBuffer?, // If present, it must have correct position/remaining! @@ -28,72 +19,56 @@ internal data class EncoderData( } internal interface EncoderChannel : Channel { - val surface: Surface? + val surface: Codecs.Surface? fun buffer(): Pair? } internal class Encoder( - private val codec: MediaCodec, - override val surface: Surface?, - ownsCodecStart: Boolean, - private val ownsCodecStop: Boolean, -) : QueuedStep(), EncoderChannel { + private val encoder: Codecs.Codec, + ownsCodecStart: Boolean, + private val ownsCodecStop: Boolean, +) : QueuedStep( + when (encoder.surface) { + null -> "AudioEncoder" + else -> "VideoEncoder" + } +), EncoderChannel { constructor(codecs: Codecs, type: TrackType) : this( - codecs.encoders[type].first, - codecs.encoders[type].second, - codecs.ownsEncoderStart[type], - codecs.ownsEncoderStop[type] + codecs.encoders[type], + codecs.ownsEncoderStart[type], + codecs.ownsEncoderStop[type] ) - companion object { - private val ID = trackMapOf(AtomicInteger(0), AtomicInteger(0)) - } - - private val type = if (surface != null) TrackType.VIDEO else TrackType.AUDIO - private val log = Logger("Encoder(${type},${ID[type].getAndIncrement()})") - private var dequeuedInputs by observable(0) { _, _, _ -> printDequeued() } - private var dequeuedOutputs by observable(0) { _, _, _ -> printDequeued() } - private fun printDequeued() { - log.v("dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") - } - + override val surface: Codecs.Surface? get() = encoder.surface override val channel = this - private val buffers by lazy { MediaCodecBuffers(codec) } - private var info = BufferInfo() - init { - log.i("Encoder: ownsStart=$ownsCodecStart ownsStop=$ownsCodecStop") + encoder.log = log + log.i("ownsStart=$ownsCodecStart ownsStop=$ownsCodecStop ${encoder.state}") if (ownsCodecStart) { - codec.start() + encoder.codec.start() } } - override fun buffer(): Pair? { - val id = codec.dequeueInputBuffer(100) - return if (id >= 0) { - dequeuedInputs++ - buffers.getInputBuffer(id) to id - } else { - log.i("buffer() failed. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") - null - } - } + override fun buffer(): Pair? = encoder.getInputBuffer() private var eosReceivedButNotEnqueued = false override fun enqueueEos(data: EncoderData) { if (surface == null) { - if (!ownsCodecStop) eosReceivedButNotEnqueued = true - val flag = if (!ownsCodecStop) 0 else BUFFER_FLAG_END_OF_STREAM - codec.queueInputBuffer(data.id, 0, 0, 0, flag) - dequeuedInputs-- + if (ownsCodecStop) { + encoder.codec.queueInputBuffer(data.id, 0, 0, 0, BUFFER_FLAG_END_OF_STREAM) + encoder.dequeuedInputs-- + } else { + eosReceivedButNotEnqueued = true + encoder.holdInputBuffer(data.buffer!!, data.id) + } } else { if (!ownsCodecStop) eosReceivedButNotEnqueued = true - else codec.signalEndOfInputStream() + else encoder.codec.signalEndOfInputStream() } } @@ -101,52 +76,51 @@ internal class Encoder( if (surface != null) return else { val buffer = requireNotNull(data.buffer) { "Audio should always pass a buffer to Encoder." } - codec.queueInputBuffer(data.id, buffer.position(), buffer.remaining(), data.timeUs, 0) - dequeuedInputs-- + encoder.codec.queueInputBuffer(data.id, buffer.position(), buffer.remaining(), data.timeUs, 0) + encoder.dequeuedInputs-- } } override fun drain(): State { val timeoutUs = if (eosReceivedButNotEnqueued) 5000L else 100L - return when (val result = codec.dequeueOutputBuffer(info, timeoutUs)) { + return when (val result = encoder.codec.dequeueOutputBuffer(info, timeoutUs)) { INFO_TRY_AGAIN_LATER -> { if (eosReceivedButNotEnqueued) { // Horrible hack. When we don't own the MediaCodec, we can't enqueue EOS so we // can't dequeue them. INFO_TRY_AGAIN_LATER is returned. We assume this means EOS. - log.i("Sending fake Eos. dequeuedInputs=$dequeuedInputs dequeuedOutputs=$dequeuedOutputs") + log.i("Sending fake Eos. ${encoder.state}") val buffer = ByteBuffer.allocateDirect(0) State.Eos(WriterData(buffer, 0L, 0) {}) } else { log.i("Can't dequeue output buffer: INFO_TRY_AGAIN_LATER") - State.Wait(true) + State.Retry(true) } } INFO_OUTPUT_FORMAT_CHANGED -> { - log.i("INFO_OUTPUT_FORMAT_CHANGED! format=${codec.outputFormat}") - next.handleFormat(codec.outputFormat) - State.Retry + log.i("INFO_OUTPUT_FORMAT_CHANGED! format=${encoder.codec.outputFormat}") + next.handleFormat(encoder.codec.outputFormat) + drain() } INFO_OUTPUT_BUFFERS_CHANGED -> { - buffers.onOutputBuffersChanged() - State.Retry + drain() } else -> { val isConfig = info.flags and BUFFER_FLAG_CODEC_CONFIG != 0 if (isConfig) { - codec.releaseOutputBuffer(result, false) - State.Retry + encoder.codec.releaseOutputBuffer(result, false) + drain() } else { - dequeuedOutputs++ + encoder.dequeuedOutputs++ val isEos = info.flags and BUFFER_FLAG_END_OF_STREAM != 0 val flags = info.flags and BUFFER_FLAG_END_OF_STREAM.inv() - val buffer = buffers.getOutputBuffer(result) + val buffer = checkNotNull(encoder.codec.getOutputBuffer(result)) { "outputBuffer($result) should not be null." } val timeUs = info.presentationTimeUs buffer.clear() buffer.limit(info.offset + info.size) buffer.position(info.offset) val data = WriterData(buffer, timeUs, flags) { - codec.releaseOutputBuffer(result, false) - dequeuedOutputs-- + encoder.codec.releaseOutputBuffer(result, false) + encoder.dequeuedOutputs-- } if (isEos) State.Eos(data) else State.Ok(data) } @@ -155,9 +129,9 @@ internal class Encoder( } override fun release() { - log.i("release(): ownsStop=$ownsCodecStop dequeuedInputs=${dequeuedInputs} dequeuedOutputs=$dequeuedOutputs") + log.i("release(): ownsStop=$ownsCodecStop ${encoder.state}") if (ownsCodecStop) { - codec.stop() + encoder.codec.stop() } } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Bridge.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Bridge.kt index 5a75f657..8bd3529b 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Bridge.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Bridge.kt @@ -2,6 +2,7 @@ package com.otaliastudios.transcoder.internal.data import android.media.MediaCodec import android.media.MediaFormat +import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.State import com.otaliastudios.transcoder.internal.pipeline.Step import com.otaliastudios.transcoder.internal.utils.Logger @@ -9,9 +10,8 @@ import java.nio.ByteBuffer import java.nio.ByteOrder internal class Bridge(private val format: MediaFormat) - : Step, ReaderChannel { + : BaseStep("Bridge"), ReaderChannel { - private val log = Logger("Bridge") private val bufferSize = format.getInteger(MediaFormat.KEY_MAX_INPUT_SIZE) private val buffer = ByteBuffer.allocateDirect(bufferSize).order(ByteOrder.nativeOrder()) override val channel = this @@ -27,7 +27,7 @@ internal class Bridge(private val format: MediaFormat) } // Can't do much about chunk.render, since we don't even decode. - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { val (chunk, _) = state.value val flags = if (chunk.keyframe) MediaCodec.BUFFER_FLAG_SYNC_FRAME else 0 val result = WriterData(chunk.buffer, chunk.timeUs, flags) {} diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt index fd5248d7..c663f6b7 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Reader.kt @@ -4,7 +4,6 @@ import com.otaliastudios.transcoder.common.TrackType import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.utils.Logger import com.otaliastudios.transcoder.source.DataSource import java.nio.ByteBuffer @@ -16,11 +15,10 @@ internal interface ReaderChannel : Channel { } internal class Reader( - private val source: DataSource, - private val track: TrackType -) : BaseStep() { + private val source: DataSource, + private val track: TrackType +) : BaseStep("Reader") { - private val log = Logger("Reader") override val channel = Channel private val chunk = DataSource.Chunk() @@ -29,13 +27,13 @@ internal class Reader( if (buffer == null) { // dequeueInputBuffer failed log.v("Returning State.Wait because buffer is null.") - return State.Wait(true) + return State.Retry(true) } else { return action(buffer.first, buffer.second) } } - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { return if (source.isDrained) { log.i("Source is drained! Returning Eos as soon as possible.") nextBufferOrWait { byteBuffer, id -> @@ -47,11 +45,12 @@ internal class Reader( } } else if (!source.canReadTrack(track)) { log.i("Returning State.Wait because source can't read $track right now.") - State.Wait(false) + State.Retry(false) } else { nextBufferOrWait { byteBuffer, id -> chunk.buffer = byteBuffer source.readTrack(chunk) + // log.v("Returning ${chunk.buffer?.remaining() ?: -1} bytes from source") State.Ok(ReaderData(chunk, id)) } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/ReaderTimer.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/ReaderTimer.kt index 57a48562..b0996afe 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/ReaderTimer.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/ReaderTimer.kt @@ -1,15 +1,15 @@ package com.otaliastudios.transcoder.internal.data import com.otaliastudios.transcoder.common.TrackType -import com.otaliastudios.transcoder.internal.pipeline.DataStep +import com.otaliastudios.transcoder.internal.pipeline.TransformStep import com.otaliastudios.transcoder.internal.pipeline.State import com.otaliastudios.transcoder.time.TimeInterpolator internal class ReaderTimer( - private val track: TrackType, - private val interpolator: TimeInterpolator -) : DataStep() { - override fun step(state: State.Ok, fresh: Boolean): State { + private val track: TrackType, + private val interpolator: TimeInterpolator +) : TransformStep("ReaderTimer") { + override fun advance(state: State.Ok): State { if (state is State.Eos) return state state.value.chunk.timeUs = interpolator.interpolate(track, state.value.chunk.timeUs) return state diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Seeker.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Seeker.kt index 610f5647..4e2ade8d 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Seeker.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Seeker.kt @@ -9,16 +9,15 @@ import com.otaliastudios.transcoder.source.DataSource import java.nio.ByteBuffer internal class Seeker( - private val source: DataSource, - positions: List, - private val seek: (Long) -> Boolean -) : BaseStep() { + private val source: DataSource, + positions: List, + private val seek: (Long) -> Boolean +) : BaseStep("Seeker") { - private val log = Logger("Seeker") override val channel = Channel private val positions = positions.toMutableList() - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { if (positions.isNotEmpty()) { if (seek(positions.first())) { log.i("Seeking to next position ${positions.first()}") diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Writer.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Writer.kt index ce2a67bf..323edbf0 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Writer.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/data/Writer.kt @@ -3,6 +3,7 @@ package com.otaliastudios.transcoder.internal.data import android.media.MediaCodec import android.media.MediaFormat import com.otaliastudios.transcoder.common.TrackType +import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.State import com.otaliastudios.transcoder.internal.pipeline.Step @@ -22,13 +23,12 @@ internal interface WriterChannel : Channel { } internal class Writer( - private val sink: DataSink, - private val track: TrackType -) : Step, WriterChannel { + private val sink: DataSink, + private val track: TrackType +) : BaseStep("Writer"), WriterChannel { override val channel = this - private val log = Logger("Writer") private val info = MediaCodec.BufferInfo() override fun handleFormat(format: MediaFormat) { @@ -36,7 +36,7 @@ internal class Writer( sink.setTrackFormat(track, format) } - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { val (buffer, timestamp, flags) = state.value // Note: flags does NOT include BUFFER_FLAG_END_OF_STREAM. That's passed via State.Eos. val eos = state is State.Eos @@ -48,10 +48,10 @@ internal class Writer( info.set(0, 0, 0, flags or MediaCodec.BUFFER_FLAG_END_OF_STREAM) } else { info.set( - buffer.position(), - buffer.remaining(), - timestamp, - flags + buffer.position(), + buffer.remaining(), + timestamp, + flags ) } sink.writeTrack(track, buffer, info) diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaCodecBuffers.java b/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaCodecBuffers.java deleted file mode 100644 index fe09ff32..00000000 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaCodecBuffers.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.otaliastudios.transcoder.internal.media; - -import android.media.MediaCodec; -import android.os.Build; - -import androidx.annotation.NonNull; - -import java.nio.ByteBuffer; - -/** - * A Wrapper to MediaCodec that facilitates the use of API-dependent get{Input/Output}Buffer methods, - * in order to prevent: http://stackoverflow.com/q/30646885 - */ -public class MediaCodecBuffers { - - private final MediaCodec mMediaCodec; - private final ByteBuffer[] mInputBuffers; - private ByteBuffer[] mOutputBuffers; - - public MediaCodecBuffers(@NonNull MediaCodec mediaCodec) { - mMediaCodec = mediaCodec; - - if (Build.VERSION.SDK_INT < 21) { - mInputBuffers = mediaCodec.getInputBuffers(); - mOutputBuffers = mediaCodec.getOutputBuffers(); - } else { - mInputBuffers = mOutputBuffers = null; - } - } - - @NonNull - public ByteBuffer getInputBuffer(final int index) { - if (Build.VERSION.SDK_INT >= 21) { - // This is nullable only for incorrect usage. - return mMediaCodec.getInputBuffer(index); - } - ByteBuffer result = mInputBuffers[index]; - result.clear(); - return result; - } - - @NonNull - public ByteBuffer getOutputBuffer(final int index) { - if (Build.VERSION.SDK_INT >= 21) { - // This is nullable only for incorrect usage. - return mMediaCodec.getOutputBuffer(index); - } - return mOutputBuffers[index]; - } - - public void onOutputBuffersChanged() { - if (Build.VERSION.SDK_INT < 21) { - mOutputBuffers = mMediaCodec.getOutputBuffers(); - } - } -} diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaFormatProvider.java b/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaFormatProvider.java index fd0c5f31..d642ec81 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaFormatProvider.java +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/media/MediaFormatProvider.java @@ -99,12 +99,11 @@ private MediaFormat decodeMediaFormat(@NonNull DataSource source, throw new RuntimeException("Can't decode this track", e); } decoder.start(); - MediaCodecBuffers buffers = new MediaCodecBuffers(decoder); MediaCodec.BufferInfo info = new MediaCodec.BufferInfo(); DataSource.Chunk chunk = new DataSource.Chunk(); MediaFormat result = null; while (result == null) { - result = decodeOnce(type, source, chunk, decoder, buffers, info); + result = decodeOnce(type, source, chunk, decoder, info); } source.deinitialize(); source.initialize(); @@ -116,18 +115,16 @@ private MediaFormat decodeOnce(@NonNull TrackType type, @NonNull DataSource source, @NonNull DataSource.Chunk chunk, @NonNull MediaCodec decoder, - @NonNull MediaCodecBuffers buffers, @NonNull MediaCodec.BufferInfo info) { // First drain then feed. - MediaFormat format = drainOnce(decoder, buffers, info); + MediaFormat format = drainOnce(decoder, info); if (format != null) return format; - feedOnce(type, source, chunk, decoder, buffers); + feedOnce(type, source, chunk, decoder); return null; } @Nullable private MediaFormat drainOnce(@NonNull MediaCodec decoder, - @NonNull MediaCodecBuffers buffers, @NonNull MediaCodec.BufferInfo info) { int result = decoder.dequeueOutputBuffer(info, 0); switch (result) { @@ -136,8 +133,7 @@ private MediaFormat drainOnce(@NonNull MediaCodec decoder, case MediaCodec.INFO_OUTPUT_FORMAT_CHANGED: return decoder.getOutputFormat(); case MediaCodec.INFO_OUTPUT_BUFFERS_CHANGED: - buffers.onOutputBuffersChanged(); - return drainOnce(decoder, buffers, info); + return drainOnce(decoder, info); default: // Drop this data immediately. decoder.releaseOutputBuffer(result, false); return null; @@ -147,14 +143,13 @@ private MediaFormat drainOnce(@NonNull MediaCodec decoder, private void feedOnce(@NonNull TrackType type, @NonNull DataSource source, @NonNull DataSource.Chunk chunk, - @NonNull MediaCodec decoder, - @NonNull MediaCodecBuffers buffers) { + @NonNull MediaCodec decoder) { if (!source.canReadTrack(type)) { throw new RuntimeException("This should never happen!"); } final int result = decoder.dequeueInputBuffer(0); if (result < 0) return; - chunk.buffer = buffers.getInputBuffer(result); + chunk.buffer = decoder.getInputBuffer(result); source.readTrack(chunk); decoder.queueInputBuffer(result, chunk.buffer.position(), diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt index ea846a70..85fd5ab0 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Pipeline.kt @@ -3,65 +3,178 @@ package com.otaliastudios.transcoder.internal.pipeline import com.otaliastudios.transcoder.internal.utils.Logger -private typealias AnyStep = Step -internal class Pipeline private constructor(name: String, private val chain: List) { +private class PipelineItem( + val step: Step, + val name: String, +) { + // var success: State.Ok? = null + // var failure: State.Retry? = null + val unhandled = ArrayDeque>() + var done = false + var advanced = false + var packets = 0 + private var nextUnhandled: ArrayDeque>? = null - private val log = Logger("Pipeline($name)") - private var headState: State.Ok = State.Ok(Unit) - private var headIndex = 0 + fun attachToNext(next: PipelineItem) { + nextUnhandled = next.unhandled + step.initialize(next = next.step.channel) + } - init { - chain.zipWithNext().reversed().forEach { (first, next) -> - first.initialize(next = next.channel) + fun canHandle(first: Boolean): Boolean { + if (done) return false + if (first) { + unhandled.clear() + unhandled.addLast(State.Ok(Unit)) + } + return unhandled.isNotEmpty() || step is QueuedStep + } + + fun handle(): State.Failure? { + advanced = false + while (unhandled.isNotEmpty() && !done) { + val input = unhandled.removeFirst() + when (val result = step.advance(input)) { + is State.Ok -> { + packets++ + advanced = true + done = result is State.Eos + nextUnhandled?.addLast(result) + } + is State.Retry -> { + unhandled.addFirst(input) + return result + } + is State.Consume -> return result + } } + if (!advanced && !done && step is QueuedStep) { + when (val result = step.tryAdvance()) { + is State.Ok -> { + packets++ + advanced = true + done = result is State.Eos + nextUnhandled?.addLast(result) + } + is State.Failure -> return result + } + } + return null + } +} + +internal class Pipeline private constructor(name: String, private val items: List) { + + private val log = Logger(name) + + init { + items.zipWithNext().reversed().forEach { (first, next) -> first.attachToNext(next) } } - // Returns Eos, Ok or Wait fun execute(): State { - log.v("execute(): starting. head=$headIndex steps=${chain.size} remaining=${chain.size - headIndex}") - val head = headIndex - var state = headState - chain.forEachIndexed { index, step -> - if (index < head) return@forEachIndexed - val fresh = head == 0 || index != head - - fun executeStep(fresh: Boolean): State.Wait? { - return when (val newState = step.step(state, fresh)) { - is State.Eos -> { - state = newState - log.i("execute(): EOS from ${step.name} (#$index/${chain.size}).") - headState = newState - headIndex = index + 1 - null - } - is State.Ok -> { - state = newState - null - } - is State.Retry -> executeStep(fresh = false) - is State.Wait -> return newState + log.v("LOOP") + var advanced = false + var sleeps = false + + for (i in items.indices) { + val item = items[i] + + if (item.canHandle(i == 0)) { + log.v("${item.name} START #${item.packets} (${item.unhandled.size} pending)") + val failure = item.handle() + if (failure != null) { + sleeps = sleeps || failure.sleep + log.v("${item.name} FAILED #${item.packets}") + } else { + log.v("${item.name} SUCCESS #${item.packets} ${if (item.done) "(eos)" else ""}") + } + advanced = advanced || item.advanced + } else { + log.v("${item.name} SKIP #${item.packets} ${if (item.done) "(eos)" else ""}") + } + } + return when { + items.isEmpty() -> State.Eos(Unit) + items.last().done -> State.Eos(Unit) + advanced -> State.Ok(Unit) + else -> State.Retry(sleeps) + } + } + + /*fun execute_OLD(): State { + var headState: State.Ok = State.Ok(Unit) + var headFresh = true + // In case of failure in the previous run, we should re-run all items before the failed one + // This is important for decoders/encoders that need more input before they can output + val previouslyFailedIndex = items.indexOfLast { it.failure != null }.takeIf { it >= 0 } + log.v("LOOP (previouslyFailed: ${previouslyFailedIndex})") + for (i in items.indices) { + val item = items[i] + val cached = item.success + val skip = cached is State.Eos || (!headFresh && cached != null) + if (skip) { + // Note: we could consider a retry() on queued steps here but it's risky + // because the current 'cached' value may have never been handled by the next item + log.v("${i+1}/${items.size} '${item.step.name}' SKIP ${if (cached is State.Eos) "(eos)" else "(handled)"}") + headState = cached!! + headFresh = false + continue + } + // This item did not succeed at the last loop, or we have fresh input data. + log.v("${i+1}/${items.size} '${item.step.name}' START (${if (headFresh) "fresh" else "stale"})") + val result = when { + !headFresh && item.step is QueuedStep -> item.step.retry() // queued steps should never get stale data + else -> item.step.advance(headState) + } + item.set(result) + if (result is State.Ok) { + log.v("${i+1}/${items.size} '${item.step.name}' SUCCESS ${if (result is State.Eos) "(eos)" else ""}") + headState = result + headFresh = true + if (i == items.lastIndex) items.forEach { + if (it.success !is State.Eos) it.set(null) + } + continue + } + // Item failed. Check if we had a later failure in the previous run. In that case + // we should retry that too. Note: `cached` should always be not null in this branch + // but let's avoid throwing + if (previouslyFailedIndex != null && i < previouslyFailedIndex) { + if (cached != null) { + log.v("${i+1}/${items.size} '${item.step.name}' FAILED (skip)") + item.set(cached) // keep 'cached' for next run + headState = cached + headFresh = false + continue } } - val wait = executeStep(fresh) - if (wait != null) return State.Wait(wait.sleep) + // Item failed: don't proceed. Return early. + log.v("${i+1}/${items.size} '${item.step.name}' FAILED") + return State.Wait(item.failure!!.sleep) } return when { - chain.isEmpty() -> State.Eos(Unit) - state is State.Eos -> State.Eos(Unit) + items.isEmpty() -> State.Eos(Unit) + headState is State.Eos -> State.Eos(Unit) else -> State.Ok(Unit) } - } + } */ fun release() { - chain.forEach { it.release() } + items.forEach { it.step.release() } } companion object { - @Suppress("UNCHECKED_CAST") - internal fun build(name: String, builder: () -> Builder<*, Channel> = { Builder() }): Pipeline { - return Pipeline(name, builder().steps as List) + internal fun build(name: String, debug: String? = null, builder: () -> Builder<*, Channel> = { Builder() }): Pipeline { + val steps = builder().steps + val items = steps.mapIndexed { index, step -> + @Suppress("UNCHECKED_CAST") + PipelineItem( + step = step as Step, + name = "${index+1}/${steps.size} '${step.name}'" + ) + } + return Pipeline("${name}Pipeline${debug ?: ""}", items) } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt index 8c71b741..2ffc6c51 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/State.kt @@ -1,9 +1,9 @@ package com.otaliastudios.transcoder.internal.pipeline -internal sealed class State { +internal sealed interface State { // Running - open class Ok(val value: T) : State() { + open class Ok(val value: T) : State { override fun toString() = "State.Ok($value)" } @@ -12,13 +12,16 @@ internal sealed class State { override fun toString() = "State.Eos($value)" } - // couldn't run, but might in the future - class Wait(val sleep: Boolean) : State() { - override fun toString() = "State.Wait($sleep)" + // Failed to produce output, try again later + sealed interface Failure : State { + val sleep: Boolean } - // call again as soon as possible - object Retry : State() { - override fun toString() = "State.Retry" + class Retry(override val sleep: Boolean) : Failure { + override fun toString() = "State.Retry($sleep)" + } + + class Consume(override val sleep: Boolean = false) : Failure { + override fun toString() = "State.Consume($sleep)" } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Step.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Step.kt index 7159182b..cc234575 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Step.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/Step.kt @@ -6,18 +6,17 @@ internal interface Channel { } internal interface Step< - Input: Any, - InputChannel: Channel, - Output: Any, - OutputChannel: Channel + Input: Any, + InputChannel: Channel, + Output: Any, + OutputChannel: Channel > { + val name: String val channel: InputChannel fun initialize(next: OutputChannel) = Unit - fun step(state: State.Ok, fresh: Boolean): State + fun advance(state: State.Ok): State fun release() = Unit } - -internal val Step<*, *, *, *>.name get() = this::class.simpleName \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/pipelines.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/pipelines.kt index e0a4b33e..6f46892d 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/pipelines.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/pipelines.kt @@ -22,11 +22,11 @@ import com.otaliastudios.transcoder.time.TimeInterpolator internal fun EmptyPipeline() = Pipeline.build("Empty") internal fun PassThroughPipeline( - track: TrackType, - source: DataSource, - sink: DataSink, - interpolator: TimeInterpolator -) = Pipeline.build("PassThrough($track)") { + track: TrackType, + source: DataSource, + sink: DataSink, + interpolator: TimeInterpolator +) = Pipeline.build("PassThrough$track") { Reader(source, track) + ReaderTimer(track, interpolator) + Bridge(source.getTrackFormat(track)!!) + @@ -34,28 +34,30 @@ internal fun PassThroughPipeline( } internal fun RegularPipeline( - track: TrackType, - source: DataSource, - sink: DataSink, - interpolator: TimeInterpolator, - format: MediaFormat, - codecs: Codecs, - videoRotation: Int, - audioStretcher: AudioStretcher, - audioResampler: AudioResampler + track: TrackType, + debug: String?, + source: DataSource, + sink: DataSink, + interpolator: TimeInterpolator, + format: MediaFormat, + codecs: Codecs, + videoRotation: Int, + audioStretcher: AudioStretcher, + audioResampler: AudioResampler ) = when (track) { - TrackType.VIDEO -> VideoPipeline(source, sink, interpolator, format, codecs, videoRotation) - TrackType.AUDIO -> AudioPipeline(source, sink, interpolator, format, codecs, audioStretcher, audioResampler) + TrackType.VIDEO -> VideoPipeline(debug, source, sink, interpolator, format, codecs, videoRotation) + TrackType.AUDIO -> AudioPipeline(debug, source, sink, interpolator, format, codecs, audioStretcher, audioResampler) } private fun VideoPipeline( - source: DataSource, - sink: DataSink, - interpolator: TimeInterpolator, - format: MediaFormat, - codecs: Codecs, - videoRotation: Int -) = Pipeline.build("Video") { + debug: String?, + source: DataSource, + sink: DataSink, + interpolator: TimeInterpolator, + format: MediaFormat, + codecs: Codecs, + videoRotation: Int +) = Pipeline.build("Video", debug) { Reader(source, TrackType.VIDEO) + Decoder(source.getTrackFormat(TrackType.VIDEO)!!, true) + DecoderTimer(TrackType.VIDEO, interpolator) + @@ -66,14 +68,15 @@ private fun VideoPipeline( } private fun AudioPipeline( - source: DataSource, - sink: DataSink, - interpolator: TimeInterpolator, - format: MediaFormat, - codecs: Codecs, - audioStretcher: AudioStretcher, - audioResampler: AudioResampler -) = Pipeline.build("Audio") { + debug: String?, + source: DataSource, + sink: DataSink, + interpolator: TimeInterpolator, + format: MediaFormat, + codecs: Codecs, + audioStretcher: AudioStretcher, + audioResampler: AudioResampler +) = Pipeline.build("Audio", debug) { Reader(source, TrackType.AUDIO) + Decoder(source.getTrackFormat(TrackType.AUDIO)!!, true) + DecoderTimer(TrackType.AUDIO, interpolator) + diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt index 9afe4ef7..8b39aa7f 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/pipeline/steps.kt @@ -1,11 +1,16 @@ package com.otaliastudios.transcoder.internal.pipeline +import com.otaliastudios.transcoder.internal.utils.Logger + internal abstract class BaseStep< - Input: Any, - InputChannel: Channel, - Output: Any, - OutputChannel: Channel -> : Step { + Input: Any, + InputChannel: Channel, + Output: Any, + OutputChannel: Channel +>(final override val name: String) : Step { + + protected val log = Logger(name) + protected lateinit var next: OutputChannel private set @@ -14,19 +19,20 @@ internal abstract class BaseStep< } } -internal abstract class DataStep : Step { +internal abstract class TransformStep(name: String) : BaseStep(name) { override lateinit var channel: C override fun initialize(next: C) { + super.initialize(next) channel = next } } internal abstract class QueuedStep< - Input: Any, - InputChannel: Channel, - Output: Any, - OutputChannel: Channel -> : BaseStep() { + Input: Any, + InputChannel: Channel, + Output: Any, + OutputChannel: Channel +>(name: String) : BaseStep(name) { protected abstract fun enqueue(data: Input) @@ -34,11 +40,18 @@ internal abstract class QueuedStep< protected abstract fun drain(): State - final override fun step(state: State.Ok, fresh: Boolean): State { - if (fresh) { - if (state is State.Eos) enqueueEos(state.value) - else enqueue(state.value) + final override fun advance(state: State.Ok): State { + if (state is State.Eos) enqueueEos(state.value) + else enqueue(state.value) + // Disallow State.Retry because the input was already handled. + return when (val result = drain()) { + is State.Retry -> State.Consume(result.sleep) + else -> result } + } + + fun tryAdvance(): State { return drain() } + } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/thumbnails/DefaultThumbnailsEngine.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/thumbnails/DefaultThumbnailsEngine.kt index e7fa5e3d..13a07bb6 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/thumbnails/DefaultThumbnailsEngine.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/thumbnails/DefaultThumbnailsEngine.kt @@ -78,6 +78,7 @@ internal class DefaultThumbnailsEngine( private fun createPipeline( type: TrackType, index: Int, + count: Int, status: TrackStatus, outputFormat: MediaFormat ): Pipeline { diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/transcode/DefaultTranscodeEngine.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/transcode/DefaultTranscodeEngine.kt index 393e65d4..260894e2 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/transcode/DefaultTranscodeEngine.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/transcode/DefaultTranscodeEngine.kt @@ -63,6 +63,7 @@ internal class DefaultTranscodeEngine( private fun createPipeline( type: TrackType, index: Int, + count: Int, status: TrackStatus, outputFormat: MediaFormat ): Pipeline { @@ -79,7 +80,7 @@ internal class DefaultTranscodeEngine( TrackStatus.ABSENT -> EmptyPipeline() TrackStatus.REMOVING -> EmptyPipeline() TrackStatus.PASS_THROUGH -> PassThroughPipeline(type, source, sink, interpolator) - TrackStatus.COMPRESSING -> RegularPipeline(type, + TrackStatus.COMPRESSING -> RegularPipeline(type, if (count > 1) "${index+1}/$count" else null, source, sink, interpolator, outputFormat, codecs, videoRotation, audioStretcher, audioResampler) } @@ -114,7 +115,7 @@ internal class DefaultTranscodeEngine( val advanced = (audio?.advance() ?: false) or (video?.advance() ?: false) val completed = !advanced && !segments.hasNext() // avoid calling hasNext if we advanced. - log.v("transcode(): executed step=$loop advanced=$advanced completed=$completed") + log.v("iteration #$loop audio=${segments.currentIndex.audio+1}/${dataSources.audio.size} video=${segments.currentIndex.video+1}/${dataSources.video.size} advanced=$advanced completed=$completed") if (Thread.interrupted()) { throw InterruptedException() } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/utils/Logger.java b/lib/src/main/java/com/otaliastudios/transcoder/internal/utils/Logger.java index 50c0b4e5..43c0c291 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/utils/Logger.java +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/utils/Logger.java @@ -21,7 +21,7 @@ public class Logger { @SuppressWarnings("WeakerAccess") public final static int LEVEL_ERROR = 3; - private static int sLevel; + private static int sLevel = LEVEL_INFO; /** * Interface of integers representing log levels. @@ -36,9 +36,16 @@ public class Logger { public @interface LogLevel {} private final String mTag; + private final int mLevel; public Logger(@NonNull String tag) { mTag = tag; + mLevel = sLevel; + } + + public Logger(@NonNull String tag, int level) { + mTag = tag; + mLevel = level; } /** @@ -55,7 +62,7 @@ public static void setLogLevel(@LogLevel int logLevel) { } private boolean should(int messageLevel) { - return sLevel <= messageLevel; + return mLevel <= messageLevel; } public void v(String message) { v(message, null); } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt index 1e9679bd..a7773a22 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoPublisher.kt @@ -5,36 +5,29 @@ import com.otaliastudios.opengl.core.EglCore import com.otaliastudios.opengl.surface.EglWindowSurface import com.otaliastudios.transcoder.internal.codec.EncoderChannel import com.otaliastudios.transcoder.internal.codec.EncoderData +import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.State import com.otaliastudios.transcoder.internal.pipeline.Step -internal class VideoPublisher: Step { +internal class VideoPublisher: BaseStep("VideoPublisher") { override val channel = Channel - private val core = EglCore(EGL14.EGL_NO_CONTEXT, EglCore.FLAG_RECORDABLE) - private lateinit var surface: EglWindowSurface - - override fun initialize(next: EncoderChannel) { - super.initialize(next) - surface = EglWindowSurface(core, next.surface!!, false) - surface.makeCurrent() - } - - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { if (state is State.Eos) { return State.Eos(EncoderData.Empty) } else { - surface.setPresentationTime(state.value * 1000) - surface.swapBuffers() + val surface = next.surface!! + surface.window.setPresentationTime(state.value * 1000) + surface.window.swapBuffers() + /* val s = EGL14.eglGetCurrentSurface(EGL14.EGL_DRAW) + val ss = IntArray(2) + EGL14.eglQuerySurface(EGL14.eglGetCurrentDisplay(), s, EGL14.EGL_WIDTH, ss, 0) + EGL14.eglQuerySurface(EGL14.eglGetCurrentDisplay(), s, EGL14.EGL_HEIGHT, ss, 1) + log.e("XXX VideoPublisher.surfaceSize: ${ss[0]}x${ss[1]}") */ return State.Ok(EncoderData.Empty) } } - - override fun release() { - surface.release() - core.release() - } } \ No newline at end of file diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt index 04a20b98..237eb4f6 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoRenderer.kt @@ -6,20 +6,17 @@ import android.view.Surface import com.otaliastudios.transcoder.internal.codec.DecoderChannel import com.otaliastudios.transcoder.internal.codec.DecoderData import com.otaliastudios.transcoder.internal.media.MediaFormatConstants.KEY_ROTATION_DEGREES +import com.otaliastudios.transcoder.internal.pipeline.BaseStep import com.otaliastudios.transcoder.internal.pipeline.Channel import com.otaliastudios.transcoder.internal.pipeline.State -import com.otaliastudios.transcoder.internal.pipeline.Step -import com.otaliastudios.transcoder.internal.utils.Logger internal class VideoRenderer( - private val sourceRotation: Int, // intrinsic source rotation - private val extraRotation: Int, // any extra rotation in TranscoderOptions - private val targetFormat: MediaFormat, - flipY: Boolean = false -): Step, DecoderChannel { - - private val log = Logger("VideoRenderer") + private val sourceRotation: Int, // intrinsic source rotation + private val extraRotation: Int, // any extra rotation in TranscoderOptions + private val targetFormat: MediaFormat, + flipY: Boolean = false +): BaseStep("VideoRenderer"), DecoderChannel { override val channel = this @@ -40,14 +37,17 @@ internal class VideoRenderer( val width = targetFormat.getInteger(KEY_WIDTH) val height = targetFormat.getInteger(KEY_HEIGHT) val flip = extraRotation % 180 != 0 - log.e("FrameDrawerEncoder: size=$width-$height, flipping=$flip") - targetFormat.setInteger(KEY_WIDTH, if (flip) height else width) - targetFormat.setInteger(KEY_HEIGHT, if (flip) width else height) + val flippedWidth = if (flip) height else width + val flippedHeight = if (flip) width else height + targetFormat.setInteger(KEY_WIDTH, flippedWidth) + targetFormat.setInteger(KEY_HEIGHT, flippedHeight) + log.i("encoded output format: $targetFormat") + log.i("output size=${flippedWidth}x${flippedHeight}, flipped=$flip") } // VideoTrackTranscoder.onConfigureDecoder override fun handleSourceFormat(sourceFormat: MediaFormat): Surface { - log.i("handleSourceFormat($sourceFormat)") + log.i("encoded input format: $sourceFormat") // Just a sanity check that the rotation coming from DataSource is not different from // the one found in the DataSource's MediaFormat for video. @@ -89,9 +89,11 @@ internal class VideoRenderer( return frameDrawer.surface } - override fun handleRawFormat(rawFormat: MediaFormat) = Unit + override fun handleRawFormat(rawFormat: MediaFormat) { + log.i("decoded input format: $rawFormat") + } - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { return if (state is State.Eos) { state.value.release(false) State.Eos(0L) @@ -102,7 +104,7 @@ internal class VideoRenderer( State.Ok(state.value.timeUs) } else { state.value.release(false) - State.Wait(false) + State.Consume() } } } diff --git a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoSnapshots.kt b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoSnapshots.kt index e8ba3e35..09fd5c28 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoSnapshots.kt +++ b/lib/src/main/java/com/otaliastudios/transcoder/internal/video/VideoSnapshots.kt @@ -20,13 +20,12 @@ import java.nio.ByteOrder import kotlin.math.abs internal class VideoSnapshots( - format: MediaFormat, - requests: List, - private val accuracyUs: Long, - private val onSnapshot: (Long, Bitmap) -> Unit -) : BaseStep() { + format: MediaFormat, + requests: List, + private val accuracyUs: Long, + private val onSnapshot: (Long, Bitmap) -> Unit +) : BaseStep("VideoSnapshots") { - private val log = Logger("VideoSnapshots") override val channel = Channel private val requests = requests.toMutableList() private val width = format.getInteger(KEY_WIDTH) @@ -36,7 +35,7 @@ internal class VideoSnapshots( it.makeCurrent() } - override fun step(state: State.Ok, fresh: Boolean): State { + override fun advance(state: State.Ok): State { if (requests.isEmpty()) return state val expectedUs = requests.first() diff --git a/lib/src/main/java/com/otaliastudios/transcoder/sink/DefaultDataSink.java b/lib/src/main/java/com/otaliastudios/transcoder/sink/DefaultDataSink.java index afa5bee1..93333027 100644 --- a/lib/src/main/java/com/otaliastudios/transcoder/sink/DefaultDataSink.java +++ b/lib/src/main/java/com/otaliastudios/transcoder/sink/DefaultDataSink.java @@ -12,6 +12,8 @@ import com.otaliastudios.transcoder.common.TrackType; import com.otaliastudios.transcoder.internal.utils.MutableTrackMap; import com.otaliastudios.transcoder.internal.utils.Logger; +import com.otaliastudios.transcoder.time.MonotonicTimeInterpolator; +import com.otaliastudios.transcoder.time.TimeInterpolator; import java.io.FileDescriptor; import java.io.IOException; @@ -64,6 +66,7 @@ private QueuedSample(@NonNull TrackType type, private final MutableTrackMap mLastFormat = mutableTrackMapOf(null); private final MutableTrackMap mMuxerIndex = mutableTrackMapOf(null); private final DefaultDataSinkChecks mMuxerChecks = new DefaultDataSinkChecks(); + private final TimeInterpolator mInterpolator = new MonotonicTimeInterpolator(); public DefaultDataSink(@NonNull String outputFilePath) { this(outputFilePath, MediaMuxer.OutputFormat.MUXER_OUTPUT_MPEG_4); @@ -149,6 +152,9 @@ private void maybeStart() { @Override public void writeTrack(@NonNull TrackType type, @NonNull ByteBuffer byteBuffer, @NonNull MediaCodec.BufferInfo bufferInfo) { if (mMuxerStarted) { + if (bufferInfo.presentationTimeUs != 0) { + bufferInfo.presentationTimeUs = mInterpolator.interpolate(type, bufferInfo.presentationTimeUs); + } /* LOG.v("writeTrack(" + type + "): offset=" + bufferInfo.offset + "\trealOffset=" + byteBuffer.position() + "\tsize=" + bufferInfo.size diff --git a/lib/src/main/java/com/otaliastudios/transcoder/time/MonotonicTimeInterpolator.kt b/lib/src/main/java/com/otaliastudios/transcoder/time/MonotonicTimeInterpolator.kt new file mode 100644 index 00000000..0f230976 --- /dev/null +++ b/lib/src/main/java/com/otaliastudios/transcoder/time/MonotonicTimeInterpolator.kt @@ -0,0 +1,28 @@ +package com.otaliastudios.transcoder.time + +import android.media.MediaMuxer +import com.otaliastudios.transcoder.common.TrackType +import com.otaliastudios.transcoder.internal.utils.mutableTrackMapOf + +/** + * A [TimeInterpolator] that ensures timestamps are monotonically increasing. + * Timestamps can go back and forth for many reasons, like miscalculations in MediaCodec output + * or manually generated timestamps, or at the boundary between one data source and another. + * + * Since [MediaMuxer.writeSampleData] can throw in case of invalid timestamps, this interpolator + * ensures that the next timestamp is at least equal to the previous timestamp plus 1. + * It does no effort to preserve the input deltas, so the input stream must be as consistent as possible. + * + * For example, 20 30 40 50 10 20 30 would become 20 30 40 50 51 52 53. + */ +internal class MonotonicTimeInterpolator : TimeInterpolator { + private val last = mutableTrackMapOf(Long.MIN_VALUE, Long.MIN_VALUE) + override fun interpolate(type: TrackType, time: Long): Long { + return interpolate(last[type], time).also { last[type] = it } + } + private fun interpolate(prev: Long, next: Long): Long { + if (prev == Long.MIN_VALUE) return next + return next.coerceAtLeast(prev + 1) + } + +} \ No newline at end of file