-
Notifications
You must be signed in to change notification settings - Fork 5.9k
perf(recorder): wrap MJPEG frames in MKV with timestamps #41191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| /** | ||
| * Copyright (c) Microsoft Corporation. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| // Minimal EBML/Matroska writer used to wrap individual MJPEG frames with explicit | ||
| // timestamps before piping them into ffmpeg (`-f matroska -i pipe:0`). This lets ffmpeg | ||
| // derive frame timing from the stream instead of us repeating frames to fake a constant | ||
| // frame rate. Only the subset of Matroska needed for a single live MJPEG track is emitted. | ||
| // | ||
| // References: | ||
| // https://www.matroska.org/technical/elements.html | ||
| // https://datatracker.ietf.org/doc/html/rfc8794 (EBML) | ||
|
|
||
| // Element IDs are written verbatim - the leading byte already encodes the length descriptor. | ||
| const kEBML = Buffer.from('1A45DFA3', 'hex'); | ||
| const kEBMLVersion = Buffer.from('4286', 'hex'); | ||
| const kEBMLReadVersion = Buffer.from('42F7', 'hex'); | ||
| const kEBMLMaxIDLength = Buffer.from('42F2', 'hex'); | ||
| const kEBMLMaxSizeLength = Buffer.from('42F3', 'hex'); | ||
| const kDocType = Buffer.from('4282', 'hex'); | ||
| const kDocTypeVersion = Buffer.from('4287', 'hex'); | ||
| const kDocTypeReadVersion = Buffer.from('4285', 'hex'); | ||
| const kSegment = Buffer.from('18538067', 'hex'); | ||
| const kInfo = Buffer.from('1549A966', 'hex'); | ||
| const kTimestampScale = Buffer.from('2AD7B1', 'hex'); | ||
| const kMuxingApp = Buffer.from('4D80', 'hex'); | ||
| const kWritingApp = Buffer.from('5741', 'hex'); | ||
| const kTracks = Buffer.from('1654AE6B', 'hex'); | ||
| const kTrackEntry = Buffer.from('AE', 'hex'); | ||
| const kTrackNumber = Buffer.from('D7', 'hex'); | ||
| const kTrackUID = Buffer.from('73C5', 'hex'); | ||
| const kTrackType = Buffer.from('83', 'hex'); | ||
| const kFlagLacing = Buffer.from('9C', 'hex'); | ||
| const kCodecID = Buffer.from('86', 'hex'); | ||
| const kVideo = Buffer.from('E0', 'hex'); | ||
| const kPixelWidth = Buffer.from('B0', 'hex'); | ||
| const kPixelHeight = Buffer.from('BA', 'hex'); | ||
| const kCluster = Buffer.from('1F43B675', 'hex'); | ||
| const kTimestamp = Buffer.from('E7', 'hex'); | ||
| const kSimpleBlock = Buffer.from('A3', 'hex'); | ||
|
|
||
| // "Unknown size" for a streaming Segment: an 8-byte EBML vint with all data bits set. | ||
| const kUnknownSize = Buffer.from([0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]); | ||
|
|
||
| // Encodes a value as an EBML variable-length size integer (vint): the leading bits select | ||
| // the byte length and are followed by the big-endian value. | ||
| function vint(value: number): Buffer { | ||
| let length = 1; | ||
| while (value >= 2 ** (7 * length) - 1) | ||
| ++length; | ||
| const buffer = Buffer.alloc(length); | ||
| let v = value; | ||
| for (let i = length - 1; i >= 0; --i) { | ||
| buffer[i] = v & 0xff; | ||
| v = Math.floor(v / 256); | ||
| } | ||
| buffer[0] |= 1 << (8 - length); | ||
| return buffer; | ||
| } | ||
|
|
||
| // Encodes a non-negative integer as a minimal big-endian byte sequence. | ||
| function uint(value: number): Buffer { | ||
| if (value === 0) | ||
| return Buffer.from([0]); | ||
| const bytes: number[] = []; | ||
| let v = value; | ||
| while (v > 0) { | ||
| bytes.unshift(v & 0xff); | ||
| v = Math.floor(v / 256); | ||
| } | ||
| return Buffer.from(bytes); | ||
| } | ||
|
|
||
| // A complete EBML element: id + size-as-vint + payload. | ||
| function element(id: Buffer, payload: Buffer): Buffer { | ||
| return Buffer.concat([id, vint(payload.length), payload]); | ||
| } | ||
|
|
||
| // Emits the Matroska header: EBML head, an unknown-size (streaming) Segment, stream Info with a | ||
| // 1ms timestamp scale, and a single MJPEG video track. Frames follow as Clusters via writeClusterHeader. | ||
| export function writeHeader(width: number, height: number): Buffer { | ||
| const ebml = element(kEBML, Buffer.concat([ | ||
| element(kEBMLVersion, uint(1)), | ||
| element(kEBMLReadVersion, uint(1)), | ||
| element(kEBMLMaxIDLength, uint(4)), | ||
| element(kEBMLMaxSizeLength, uint(8)), | ||
| element(kDocType, Buffer.from('matroska')), | ||
| element(kDocTypeVersion, uint(4)), | ||
| element(kDocTypeReadVersion, uint(2)), | ||
| ])); | ||
| const info = element(kInfo, Buffer.concat([ | ||
| // TimestampScale in nanoseconds per tick: 1_000_000 => timestamps are expressed in milliseconds. | ||
| element(kTimestampScale, uint(1000000)), | ||
| element(kMuxingApp, Buffer.from('playwright')), | ||
| element(kWritingApp, Buffer.from('playwright')), | ||
| ])); | ||
| const track = element(kTrackEntry, Buffer.concat([ | ||
| element(kTrackNumber, uint(1)), | ||
| element(kTrackUID, uint(1)), | ||
| element(kTrackType, uint(1)), // 1 = video. | ||
| element(kFlagLacing, uint(0)), | ||
| element(kCodecID, Buffer.from('V_MJPEG')), | ||
| // PixelWidth/PixelHeight are advisory: ffmpeg's mjpeg decoder uses the dimensions encoded in | ||
| // each JPEG frame, and the output video filters normalize to the requested size. | ||
| element(kVideo, Buffer.concat([ | ||
| element(kPixelWidth, uint(width)), | ||
| element(kPixelHeight, uint(height)), | ||
| ])), | ||
| ])); | ||
| const tracks = element(kTracks, track); | ||
| return Buffer.concat([ebml, kSegment, kUnknownSize, info, tracks]); | ||
| } | ||
|
|
||
| // Emits the bytes that precede a single MJPEG frame in its own Cluster, timestamped at the given | ||
| // absolute millisecond offset. The frame itself is NOT copied here - the caller writes this header | ||
| // followed by the raw frame buffer, so the (potentially large) JPEG is never duplicated. Each MJPEG | ||
| // frame is intra-coded, so it is its own keyframe in its own Cluster (relative timecode 0), which | ||
| // keeps timecodes within the SimpleBlock int16 range regardless of how long frames are apart. | ||
| export function writeClusterHeader(timestampMs: number, frameLength: number): Buffer { | ||
| // SimpleBlock payload = track number vint (1 byte) + relative timecode (2 bytes) + flags (1 byte) | ||
| // + the frame, which the caller appends. | ||
| const simpleBlockHeader = Buffer.concat([ | ||
| kSimpleBlock, | ||
| vint(4 + frameLength), | ||
| vint(1), // Track number (1). | ||
| Buffer.from([0x00, 0x00]), // Relative timecode (int16), always 0 within its own Cluster. | ||
| Buffer.from([0x80]), // Flags: keyframe. | ||
| ]); | ||
| const timestamp = element(kTimestamp, uint(timestampMs)); | ||
| const clusterPayloadLength = timestamp.length + simpleBlockHeader.length + frameLength; | ||
| return Buffer.concat([ | ||
| kCluster, | ||
| vint(clusterPayloadLength), | ||
| timestamp, | ||
| simpleBlockHeader, | ||
| ]); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import { debugLogger } from '@utils/debugLogger'; | |
| import { mkdirIfNeeded } from '@utils/fileUtils'; | ||
| import { monotonicTime } from '@isomorphic/time'; | ||
| import { Artifact } from './artifact'; | ||
| import { writeClusterHeader, writeHeader } from './ebml'; | ||
| import { registry } from './registry'; | ||
|
|
||
| import type * as types from './types'; | ||
|
|
@@ -99,11 +100,9 @@ class FfmpegVideoRecorder { | |
| private _size: types.Size; | ||
| private _process: ChildProcess | null = null; | ||
| private _gracefullyClose: (() => Promise<void>) | null = null; | ||
| private _lastWritePromise: Promise<void> = Promise.resolve(); | ||
| private _firstFrameTimestamp: number = 0; | ||
| private _lastFrame: { timestamp: number, frameNumber: number, buffer: Buffer } | null = null; | ||
| private _lastFrame: { timestamp: number, frameNumber: number, timestampMs: number, buffer: Buffer } | null = null; | ||
| private _lastWriteNodeTime: number = 0; | ||
| private _frameQueue: Buffer[] = []; | ||
| private _isStopped = false; | ||
| private _ffmpegPath: string; | ||
| private _launchPromise: Promise<Error | null>; | ||
|
|
@@ -147,21 +146,24 @@ class FfmpegVideoRecorder { | |
| // https://ffmpeg.org/ffmpeg-filters.html#pad-1 | ||
| // https://ffmpeg.org/ffmpeg-filters.html#crop | ||
| // | ||
| // We use "image2pipe" mode to pipe frames and get a single video - https://trac.ffmpeg.org/wiki/Slideshow | ||
| // "-f image2pipe -c:v mjpeg -i -" forces input to be read from standard input, and forces | ||
| // mjpeg input image format. | ||
| // "-avioflags direct" reduces general buffering. | ||
| // We wrap each incoming MJPEG frame into a minimal Matroska stream (see ./ebml.ts) with an | ||
| // explicit timestamp, and let ffmpeg read frame timing from that stream. | ||
| // "-f matroska -i pipe:0" forces input to be read from standard input as Matroska. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we compile ffmpeg with matroska supprot? I thought we included only minimal required tool set.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do! WebM uses Matroska so it's in our binary already. |
||
| // "-fpsprobesize 0 -probesize 32 -analyzeduration 0" reduces initial buffering | ||
| // while analyzing input fps and other stats. | ||
| // Note: "-avioflags direct" must NOT be used here - it breaks Matroska header parsing | ||
| // by disabling the input buffering the demuxer needs. | ||
| // | ||
| // "-y" means overwrite output. | ||
| // "-an" means no audio. | ||
| // "-r 25" forces a constant output frame rate; ffmpeg duplicates frames as needed based on | ||
| // the input timestamps, so we don't have to repeat frames ourselves. | ||
| // "-threads 1" means using one thread. This drastically reduces stalling when | ||
| // cpu is overbooked. By default vp8 tries to use all available threads? | ||
|
|
||
| const w = this._size.width; | ||
| const h = this._size.height; | ||
| const args = `-loglevel error -f image2pipe -avioflags direct -fpsprobesize 0 -probesize 32 -analyzeduration 0 -c:v mjpeg -i pipe:0 -y -an -r ${fps} -c:v vp8 -qmin 0 -qmax 50 -crf 8 -deadline realtime -speed 8 -b:v 1M -threads 1 -vf pad=${w}:${h}:0:0:gray,crop=${w}:${h}:0:0`.split(' '); | ||
| const args = `-loglevel error -f matroska -fpsprobesize 0 -probesize 32 -analyzeduration 0 -i pipe:0 -y -an -r ${fps} -c:v vp8 -qmin 0 -qmax 50 -crf 8 -deadline realtime -speed 8 -b:v 1M -threads 1 -vf pad=${w}:${h}:0:0:gray,crop=${w}:${h}:0:0`.split(' '); | ||
| args.push(this._outputFile); | ||
|
|
||
| const { launchedProcess, gracefullyClose } = await launchProcess({ | ||
|
|
@@ -186,6 +188,7 @@ class FfmpegVideoRecorder { | |
| }); | ||
| this._process = launchedProcess; | ||
| this._gracefullyClose = gracefullyClose; | ||
| launchedProcess.stdin!.write(writeHeader(w, h)); | ||
|
yury-s marked this conversation as resolved.
|
||
| } | ||
|
|
||
| writeFrame(frame: Buffer, timestamp: number) { | ||
|
|
@@ -203,30 +206,25 @@ class FfmpegVideoRecorder { | |
|
|
||
| if (!this._firstFrameTimestamp) | ||
| this._firstFrameTimestamp = timestamp; | ||
| const timestampMs = Math.max(0, Math.round((timestamp - this._firstFrameTimestamp) * 1000)); | ||
|
|
||
| const frameNumber = Math.floor((timestamp - this._firstFrameTimestamp) * fps); | ||
| // The output is constant frame rate, so multiple input frames that map to the same output | ||
| // slot would be redundant - ffmpeg only keeps one of them. We coalesce them on our side to | ||
| // avoid muxing and piping frames that ffmpeg would just discard, keeping the most recent | ||
| // frame for each slot. The pending frame is only emitted once the slot is complete (a frame | ||
| // belonging to a later slot arrives, or recording stops), so that fast bursts of frames | ||
| // within a single slot still surface the latest pixels rather than the first ones. | ||
| const frameNumber = Math.floor(timestampMs * fps / 1000); | ||
| if (this._lastFrame && frameNumber !== this._lastFrame.frameNumber) | ||
| this._emitFrame(this._lastFrame.buffer, this._lastFrame.timestampMs); | ||
|
|
||
| if (this._lastFrame) { | ||
| const repeatCount = frameNumber - this._lastFrame.frameNumber; | ||
| for (let i = 0; i < repeatCount; ++i) | ||
| this._frameQueue.push(this._lastFrame.buffer); | ||
| this._lastWritePromise = this._lastWritePromise.then(() => this._sendFrames()); | ||
| } | ||
|
|
||
| this._lastFrame = { buffer: frame, timestamp, frameNumber }; | ||
| this._lastFrame = { buffer: frame, timestamp, frameNumber, timestampMs }; | ||
| this._lastWriteNodeTime = monotonicTime(); | ||
| } | ||
|
|
||
| private async _sendFrames() { | ||
| while (this._frameQueue.length) | ||
| await this._sendFrame(this._frameQueue.shift()!); | ||
| } | ||
|
|
||
| private async _sendFrame(frame: Buffer) { | ||
| return new Promise(f => this._process!.stdin!.write(frame, f)).then(error => { | ||
| if (error) | ||
| debugLogger.log('browser', `ffmpeg failed to write: ${String(error)}`); | ||
| }); | ||
| private _emitFrame(frame: Buffer, timestampMs: number) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand how ToT applies backpressure, I don't see any backpressure signal propagated to the browser in |
||
| this._process!.stdin!.write(writeClusterHeader(timestampMs, frame.length)); | ||
| this._process!.stdin!.write(frame); | ||
| } | ||
|
|
||
| async _stop() { | ||
|
|
@@ -237,16 +235,17 @@ class FfmpegVideoRecorder { | |
| if (this._isStopped) | ||
| return; | ||
| if (!this._lastFrame) { | ||
| // ffmpeg only creates a file upon some non-empty input | ||
| this._writeFrame(createWhiteImage(this._size.width, this._size.height), monotonicTime()); | ||
| // ffmpeg only creates a file upon some non-empty input. | ||
| this._writeFrame(createWhiteImage(this._size.width, this._size.height), monotonicTime() / 1000); | ||
| } | ||
| // Pad with at least 1s of the last frame in the end for convenience. | ||
| // This also ensures non-empty videos with 1 frame. | ||
| // Emit the last received frame at its own slot, then repeat it at the end so it stays visible | ||
| // for at least 1s. This also ensures non-empty videos with 1 frame and gives the output stream | ||
| // a final timestamp. | ||
| this._emitFrame(this._lastFrame!.buffer, this._lastFrame!.timestampMs); | ||
| const addTime = Math.max((monotonicTime() - this._lastWriteNodeTime) / 1000, 1); | ||
| this._writeFrame(Buffer.from([]), this._lastFrame!.timestamp + addTime); | ||
| this._emitFrame(this._lastFrame!.buffer, this._lastFrame!.timestampMs + Math.round(addTime * 1000)); | ||
| this._isStopped = true; | ||
| try { | ||
| await this._lastWritePromise; | ||
| await this._gracefullyClose!(); | ||
| } catch (e) { | ||
| debugLogger.log('error', `ffmpeg failed to stop: ${String(e)}`); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the way we ensure correctness of this code is to eyeball the resulting videos?