diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index da00763840a7..acc6e41cd460 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -342,11 +342,6 @@ type Response = { _chunks: Map>, _fromJSON: (key: string, value: JSONValue) => any, _stringDecoder: StringDecoder, - _rowState: RowParserState, - _rowID: number, // parts of a row ID parsed so far - _rowTag: number, // 0 indicates that we're currently parsing the row ID - _rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline. - _buffer: Array, // chunks received so far as part of this row _closed: boolean, _closedReason: mixed, _tempRefs: void | TemporaryReferenceSet, // the set temporary references can be resolved from @@ -2154,11 +2149,6 @@ function ResponseInstance( this._chunks = chunks; this._stringDecoder = createStringDecoder(); this._fromJSON = (null: any); - this._rowState = 0; - this._rowID = 0; - this._rowTag = 0; - this._rowLength = 0; - this._buffer = []; this._closed = false; this._closedReason = null; this._tempRefs = temporaryReferences; @@ -2259,6 +2249,24 @@ export function createResponse( ); } +export type StreamState = { + _rowState: RowParserState, + _rowID: number, // parts of a row ID parsed so far + _rowTag: number, // 0 indicates that we're currently parsing the row ID + _rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline. + _buffer: Array, // chunks received so far as part of this row +}; + +export function createStreamState(): StreamState { + return { + _rowState: 0, + _rowID: 0, + _rowTag: 0, + _rowLength: 0, + _buffer: [], + }; +} + function resolveDebugHalt(response: Response, id: number): void { const chunks = response._chunks; let chunk = chunks.get(id); @@ -3995,6 +4003,7 @@ function processFullStringRow( export function processBinaryChunk( weakResponse: WeakResponse, + streamState: StreamState, chunk: Uint8Array, ): void { if (hasGCedResponse(weakResponse)) { @@ -4003,11 +4012,11 @@ export function processBinaryChunk( } const response = unwrapWeakResponse(weakResponse); let i = 0; - let rowState = response._rowState; - let rowID = response._rowID; - let rowTag = response._rowTag; - let rowLength = response._rowLength; - const buffer = response._buffer; + let rowState = streamState._rowState; + let rowID = streamState._rowID; + let rowTag = streamState._rowTag; + let rowLength = streamState._rowLength; + const buffer = streamState._buffer; const chunkLength = chunk.length; while (i < chunkLength) { let lastIdx = -1; @@ -4112,14 +4121,15 @@ export function processBinaryChunk( break; } } - response._rowState = rowState; - response._rowID = rowID; - response._rowTag = rowTag; - response._rowLength = rowLength; + streamState._rowState = rowState; + streamState._rowID = rowID; + streamState._rowTag = rowTag; + streamState._rowLength = rowLength; } export function processStringChunk( weakResponse: WeakResponse, + streamState: StreamState, chunk: string, ): void { if (hasGCedResponse(weakResponse)) { @@ -4136,11 +4146,11 @@ export function processStringChunk( // here. Basically, only if Flight Server gave you this string as a chunk, // you can use it here. let i = 0; - let rowState = response._rowState; - let rowID = response._rowID; - let rowTag = response._rowTag; - let rowLength = response._rowLength; - const buffer = response._buffer; + let rowState = streamState._rowState; + let rowID = streamState._rowID; + let rowTag = streamState._rowTag; + let rowLength = streamState._rowLength; + const buffer = streamState._buffer; const chunkLength = chunk.length; while (i < chunkLength) { let lastIdx = -1; @@ -4264,10 +4274,10 @@ export function processStringChunk( ); } } - response._rowState = rowState; - response._rowID = rowID; - response._rowTag = rowTag; - response._rowLength = rowLength; + streamState._rowState = rowState; + streamState._rowID = rowID; + streamState._rowTag = rowTag; + streamState._rowLength = rowLength; } function parseModel(response: Response, json: UninitializedModel): T { diff --git a/packages/react-markup/src/ReactMarkupServer.js b/packages/react-markup/src/ReactMarkupServer.js index f144211711d7..34d234b0be44 100644 --- a/packages/react-markup/src/ReactMarkupServer.js +++ b/packages/react-markup/src/ReactMarkupServer.js @@ -25,6 +25,7 @@ import { import { createResponse as createFlightResponse, + createStreamState as createFlightStreamState, getRoot as getFlightRoot, processStringChunk as processFlightStringChunk, close as closeFlight, @@ -80,10 +81,11 @@ export function experimental_renderToHTML( options?: MarkupOptions, ): Promise { return new Promise((resolve, reject) => { + const streamState = createFlightStreamState(); const flightDestination = { push(chunk: string | null): boolean { if (chunk !== null) { - processFlightStringChunk(flightResponse, chunk); + processFlightStringChunk(flightResponse, streamState, chunk); } else { closeFlight(flightResponse); } diff --git a/packages/react-noop-renderer/src/ReactNoopFlightClient.js b/packages/react-noop-renderer/src/ReactNoopFlightClient.js index d4e6e6ea7936..3d8984a07fb1 100644 --- a/packages/react-noop-renderer/src/ReactNoopFlightClient.js +++ b/packages/react-noop-renderer/src/ReactNoopFlightClient.js @@ -24,35 +24,36 @@ type Source = Array; const decoderOptions = {stream: true}; -const {createResponse, processBinaryChunk, getRoot, close} = ReactFlightClient({ - createStringDecoder() { - return new TextDecoder(); - }, - readPartialStringChunk(decoder: TextDecoder, buffer: Uint8Array): string { - return decoder.decode(buffer, decoderOptions); - }, - readFinalStringChunk(decoder: TextDecoder, buffer: Uint8Array): string { - return decoder.decode(buffer); - }, - resolveClientReference(bundlerConfig: null, idx: string) { - return idx; - }, - prepareDestinationForModule(moduleLoading: null, metadata: string) {}, - preloadModule(idx: string) {}, - requireModule(idx: string) { - return readModule(idx); - }, - parseModel(response: Response, json) { - return JSON.parse(json, response._fromJSON); - }, - bindToConsole(methodName, args, badgeName) { - return Function.prototype.bind.apply( - // eslint-disable-next-line react-internal/no-production-logging - console[methodName], - [console].concat(args), - ); - }, -}); +const {createResponse, createStreamState, processBinaryChunk, getRoot, close} = + ReactFlightClient({ + createStringDecoder() { + return new TextDecoder(); + }, + readPartialStringChunk(decoder: TextDecoder, buffer: Uint8Array): string { + return decoder.decode(buffer, decoderOptions); + }, + readFinalStringChunk(decoder: TextDecoder, buffer: Uint8Array): string { + return decoder.decode(buffer); + }, + resolveClientReference(bundlerConfig: null, idx: string) { + return idx; + }, + prepareDestinationForModule(moduleLoading: null, metadata: string) {}, + preloadModule(idx: string) {}, + requireModule(idx: string) { + return readModule(idx); + }, + parseModel(response: Response, json) { + return JSON.parse(json, response._fromJSON); + }, + bindToConsole(methodName, args, badgeName) { + return Function.prototype.bind.apply( + // eslint-disable-next-line react-internal/no-production-logging + console[methodName], + [console].concat(args), + ); + }, + }); type ReadOptions = {| findSourceMapURL?: FindSourceMapURLCallback, @@ -76,8 +77,9 @@ function read(source: Source, options: ReadOptions): Thenable { ? options.debugChannel.onMessage : undefined, ); + const streamState = createStreamState(); for (let i = 0; i < source.length; i++) { - processBinaryChunk(response, source[i], 0); + processBinaryChunk(response, streamState, source[i], 0); } if (options !== undefined && options.close) { close(response); diff --git a/packages/react-server-dom-esm/src/client/ReactFlightDOMClientBrowser.js b/packages/react-server-dom-esm/src/client/ReactFlightDOMClientBrowser.js index 9e4a9efb58cc..c358694b6352 100644 --- a/packages/react-server-dom-esm/src/client/ReactFlightDOMClientBrowser.js +++ b/packages/react-server-dom-esm/src/client/ReactFlightDOMClientBrowser.js @@ -19,9 +19,11 @@ import type {ReactServerValue} from 'react-client/src/ReactFlightReplyClient'; import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, + processStringChunk, close, injectIntoDevTools, } from 'react-client/src/ReactFlightClient'; @@ -44,7 +46,7 @@ type CallServerCallback = (string, args: A) => Promise; export type Options = { moduleBaseURL?: string, callServer?: CallServerCallback, - debugChannel?: {writable?: WritableStream, ...}, + debugChannel?: {writable?: WritableStream, readable?: ReadableStream, ...}, temporaryReferences?: TemporaryReferenceSet, findSourceMapURL?: FindSourceMapURLCallback, replayConsoleLogs?: boolean, @@ -96,10 +98,50 @@ function createResponseFromOptions(options: void | Options) { ); } +function startReadingFromUniversalStream( + response: FlightResponse, + stream: ReadableStream, +): void { + // This is the same as startReadingFromStream except this allows WebSocketStreams which + // return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially + // always allow streams with variable chunk types. + const streamState = createStreamState(); + const reader = stream.getReader(); + function progress({ + done, + value, + }: { + done: boolean, + value: any, + ... + }): void | Promise { + if (done) { + close(response); + return; + } + if (value instanceof ArrayBuffer) { + // WebSockets can produce ArrayBuffer values in ReadableStreams. + processBinaryChunk(response, streamState, new Uint8Array(value)); + } else if (typeof value === 'string') { + // WebSockets can produce string values in ReadableStreams. + processStringChunk(response, streamState, value); + } else { + processBinaryChunk(response, streamState, value); + } + return reader.read().then(progress).catch(error); + } + function error(e: any) { + reportGlobalError(response, e); + } + reader.read().then(progress).catch(error); +} + function startReadingFromStream( response: FlightResponse, stream: ReadableStream, + isSecondaryStream: boolean, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -110,11 +152,14 @@ function startReadingFromStream( ... }): void | Promise { if (done) { - close(response); + // If we're the secondary stream, then we don't close the response until the debug channel closes. + if (!isSecondaryStream) { + close(response); + } return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { @@ -122,13 +167,22 @@ function startReadingFromStream( } reader.read().then(progress).catch(error); } - function createFromReadableStream( stream: ReadableStream, options?: Options, ): Thenable { const response: FlightResponse = createResponseFromOptions(options); - startReadingFromStream(response, stream); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream(response, options.debugChannel.readable); + startReadingFromStream(response, stream, true); + } else { + startReadingFromStream(response, stream, false); + } return getRoot(response); } @@ -139,7 +193,20 @@ function createFromFetch( const response: FlightResponse = createResponseFromOptions(options); promiseForResponse.then( function (r) { - startReadingFromStream(response, (r.body: any)); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream( + response, + options.debugChannel.readable, + ); + startReadingFromStream(response, (r.body: any), true); + } else { + startReadingFromStream(response, (r.body: any), false); + } }, function (e) { reportGlobalError(response, e); diff --git a/packages/react-server-dom-esm/src/client/ReactFlightDOMClientNode.js b/packages/react-server-dom-esm/src/client/ReactFlightDOMClientNode.js index 75c569e5ac4b..0af6d4e22e47 100644 --- a/packages/react-server-dom-esm/src/client/ReactFlightDOMClientNode.js +++ b/packages/react-server-dom-esm/src/client/ReactFlightDOMClientNode.js @@ -18,8 +18,10 @@ import type {Readable} from 'stream'; import { createResponse, + createStreamState, getRoot, reportGlobalError, + processStringChunk, processBinaryChunk, close, } from 'react-client/src/ReactFlightClient'; @@ -78,8 +80,13 @@ function createFromNodeStream( ? options.environmentName : undefined, ); + const streamState = createStreamState(); stream.on('data', chunk => { - processBinaryChunk(response, chunk); + if (typeof chunk === 'string') { + processStringChunk(response, streamState, chunk); + } else { + processBinaryChunk(response, streamState, chunk); + } }); stream.on('error', error => { reportGlobalError(response, error); diff --git a/packages/react-server-dom-esm/src/server/ReactFlightDOMServerNode.js b/packages/react-server-dom-esm/src/server/ReactFlightDOMServerNode.js index 6f530dd2b6fd..7e63fe9fd19a 100644 --- a/packages/react-server-dom-esm/src/server/ReactFlightDOMServerNode.js +++ b/packages/react-server-dom-esm/src/server/ReactFlightDOMServerNode.js @@ -27,6 +27,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -139,7 +140,7 @@ function startReadingFromDebugChannelReadable( } type Options = { - debugChannel?: Readable | Duplex | WebSocket, + debugChannel?: Readable | Writable | Duplex | WebSocket, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, onError?: (error: mixed) => void, @@ -159,6 +160,24 @@ function renderToPipeableStream( options?: Options, ): PipeableStream { const debugChannel = __DEV__ && options ? options.debugChannel : undefined; + const debugChannelReadable: void | Readable | WebSocket = + __DEV__ && + debugChannel !== undefined && + // $FlowFixMe[method-unbinding] + (typeof debugChannel.read === 'function' || + typeof debugChannel.readyState === 'number') + ? (debugChannel: any) + : undefined; + const debugChannelWritable: void | Writable = + __DEV__ && debugChannel !== undefined + ? // $FlowFixMe[method-unbinding] + typeof debugChannel.write === 'function' + ? (debugChannel: any) + : // $FlowFixMe[method-unbinding] + typeof debugChannel.send === 'function' + ? createFakeWritableFromWebSocket((debugChannel: any)) + : undefined + : undefined; const request = createRequest( model, moduleBasePath, @@ -172,8 +191,11 @@ function renderToPipeableStream( ); let hasStartedFlowing = false; startWork(request); - if (debugChannel !== undefined) { - startReadingFromDebugChannelReadable(request, debugChannel); + if (debugChannelWritable !== undefined) { + startFlowingDebug(request, debugChannelWritable); + } + if (debugChannelReadable !== undefined) { + startReadingFromDebugChannelReadable(request, debugChannelReadable); } return { pipe(destination: T): T { @@ -192,10 +214,13 @@ function renderToPipeableStream( 'The destination stream errored while writing data.', ), ); - destination.on( - 'close', - createCancelHandler(request, 'The destination stream closed early.'), - ); + // We don't close until the debug channel closes. + if (!__DEV__ || debugChannelReadable === undefined) { + destination.on( + 'close', + createCancelHandler(request, 'The destination stream closed early.'), + ); + } return destination; }, abort(reason: mixed) { @@ -204,6 +229,28 @@ function renderToPipeableStream( }; } +function createFakeWritableFromWebSocket(webSocket: WebSocket): Writable { + return ({ + write(chunk: string | Uint8Array) { + webSocket.send((chunk: any)); + return true; + }, + end() { + webSocket.close(); + }, + destroy(reason) { + if (typeof reason === 'object' && reason !== null) { + reason = reason.message; + } + if (typeof reason === 'string') { + webSocket.close(1011, reason); + } else { + webSocket.close(1011); + } + }, + }: any); +} + function createFakeWritable(readable: any): Writable { // The current host config expects a Writable so we create // a fake writable for now to push into the Readable. diff --git a/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientBrowser.js b/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientBrowser.js index 47098d94902d..2efa754e467e 100644 --- a/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientBrowser.js +++ b/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientBrowser.js @@ -17,9 +17,11 @@ import type {ServerReferenceId} from '../client/ReactFlightClientConfigBundlerPa import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, + processStringChunk, close, injectIntoDevTools, } from 'react-client/src/ReactFlightClient'; @@ -97,10 +99,50 @@ function createDebugCallbackFromWritableStream( }; } +function startReadingFromUniversalStream( + response: FlightResponse, + stream: ReadableStream, +): void { + // This is the same as startReadingFromStream except this allows WebSocketStreams which + // return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially + // always allow streams with variable chunk types. + const streamState = createStreamState(); + const reader = stream.getReader(); + function progress({ + done, + value, + }: { + done: boolean, + value: any, + ... + }): void | Promise { + if (done) { + close(response); + return; + } + if (value instanceof ArrayBuffer) { + // WebSockets can produce ArrayBuffer values in ReadableStreams. + processBinaryChunk(response, streamState, new Uint8Array(value)); + } else if (typeof value === 'string') { + // WebSockets can produce string values in ReadableStreams. + processStringChunk(response, streamState, value); + } else { + processBinaryChunk(response, streamState, value); + } + return reader.read().then(progress).catch(error); + } + function error(e: any) { + reportGlobalError(response, e); + } + reader.read().then(progress).catch(error); +} + function startReadingFromStream( response: FlightResponse, stream: ReadableStream, + isSecondaryStream: boolean, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -111,11 +153,14 @@ function startReadingFromStream( ... }): void | Promise { if (done) { - close(response); + // If we're the secondary stream, then we don't close the response until the debug channel closes. + if (!isSecondaryStream) { + close(response); + } return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { @@ -125,7 +170,7 @@ function startReadingFromStream( } export type Options = { - debugChannel?: {writable?: WritableStream, ...}, + debugChannel?: {writable?: WritableStream, readable?: ReadableStream, ...}, temporaryReferences?: TemporaryReferenceSet, replayConsoleLogs?: boolean, environmentName?: string, @@ -157,7 +202,17 @@ export function createFromReadableStream( ? createDebugCallbackFromWritableStream(options.debugChannel.writable) : undefined, ); - startReadingFromStream(response, stream); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream(response, options.debugChannel.readable); + startReadingFromStream(response, stream, true); + } else { + startReadingFromStream(response, stream, false); + } return getRoot(response); } @@ -189,7 +244,20 @@ export function createFromFetch( ); promiseForResponse.then( function (r) { - startReadingFromStream(response, (r.body: any)); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream( + response, + options.debugChannel.readable, + ); + startReadingFromStream(response, (r.body: any), true); + } else { + startReadingFromStream(response, (r.body: any), false); + } }, function (e) { reportGlobalError(response, e); diff --git a/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientEdge.js b/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientEdge.js index b1fbfed08f07..406ee54f7236 100644 --- a/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientEdge.js +++ b/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientEdge.js @@ -14,6 +14,7 @@ import type {ReactServerValue} from 'react-client/src/ReactFlightReplyClient'; import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, @@ -100,6 +101,7 @@ function startReadingFromStream( response: FlightResponse, stream: ReadableStream, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -114,7 +116,7 @@ function startReadingFromStream( return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { diff --git a/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientNode.js b/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientNode.js index b4b69443ccab..7b0507a7d2da 100644 --- a/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientNode.js +++ b/packages/react-server-dom-parcel/src/client/ReactFlightDOMClientNode.js @@ -13,8 +13,10 @@ import type {Readable} from 'stream'; import { createResponse, + createStreamState, getRoot, reportGlobalError, + processStringChunk, processBinaryChunk, close, } from 'react-client/src/ReactFlightClient'; @@ -70,8 +72,13 @@ export function createFromNodeStream( ? options.environmentName : undefined, ); + const streamState = createStreamState(); stream.on('data', chunk => { - processBinaryChunk(response, chunk); + if (typeof chunk === 'string') { + processStringChunk(response, streamState, chunk); + } else { + processBinaryChunk(response, streamState, chunk); + } }); stream.on('error', error => { reportGlobalError(response, error); diff --git a/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerBrowser.js b/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerBrowser.js index 988f5628a919..c2e7e6062edd 100644 --- a/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerBrowser.js +++ b/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerBrowser.js @@ -25,6 +25,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -59,7 +60,7 @@ export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTem export type {TemporaryReferenceSet}; type Options = { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, identifierPrefix?: string, @@ -118,6 +119,10 @@ export function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, null, @@ -141,6 +146,19 @@ export function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + const debugStream = new ReadableStream( + { + type: 'bytes', + pull: (controller): ?Promise => { + startFlowingDebug(request, controller); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerEdge.js b/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerEdge.js index 54d9a78c5f92..ba66282f1b06 100644 --- a/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerEdge.js +++ b/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerEdge.js @@ -27,6 +27,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -64,7 +65,7 @@ export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTem export type {TemporaryReferenceSet}; type Options = { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, identifierPrefix?: string, @@ -123,6 +124,10 @@ export function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, null, @@ -146,6 +151,19 @@ export function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + const debugStream = new ReadableStream( + { + type: 'bytes', + pull: (controller): ?Promise => { + startFlowingDebug(request, controller); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerNode.js b/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerNode.js index abdd6452793d..79f783ea4701 100644 --- a/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerNode.js +++ b/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerNode.js @@ -31,6 +31,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -152,7 +153,7 @@ function startReadingFromDebugChannelReadable( } type Options = { - debugChannel?: Readable | Duplex | WebSocket, + debugChannel?: Readable | Writable | Duplex | WebSocket, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, onError?: (error: mixed) => void, @@ -171,6 +172,24 @@ export function renderToPipeableStream( options?: Options, ): PipeableStream { const debugChannel = __DEV__ && options ? options.debugChannel : undefined; + const debugChannelReadable: void | Readable | WebSocket = + __DEV__ && + debugChannel !== undefined && + // $FlowFixMe[method-unbinding] + (typeof debugChannel.read === 'function' || + typeof debugChannel.readyState === 'number') + ? (debugChannel: any) + : undefined; + const debugChannelWritable: void | Writable = + __DEV__ && debugChannel !== undefined + ? // $FlowFixMe[method-unbinding] + typeof debugChannel.write === 'function' + ? (debugChannel: any) + : // $FlowFixMe[method-unbinding] + typeof debugChannel.send === 'function' + ? createFakeWritableFromWebSocket((debugChannel: any)) + : undefined + : undefined; const request = createRequest( model, null, @@ -184,8 +203,11 @@ export function renderToPipeableStream( ); let hasStartedFlowing = false; startWork(request); - if (debugChannel !== undefined) { - startReadingFromDebugChannelReadable(request, debugChannel); + if (debugChannelWritable !== undefined) { + startFlowingDebug(request, debugChannelWritable); + } + if (debugChannelReadable !== undefined) { + startReadingFromDebugChannelReadable(request, debugChannelReadable); } return { pipe(destination: T): T { @@ -204,10 +226,13 @@ export function renderToPipeableStream( 'The destination stream errored while writing data.', ), ); - destination.on( - 'close', - createCancelHandler(request, 'The destination stream closed early.'), - ); + // We don't close until the debug channel closes. + if (!__DEV__ || debugChannelReadable === undefined) { + destination.on( + 'close', + createCancelHandler(request, 'The destination stream closed early.'), + ); + } return destination; }, abort(reason: mixed) { @@ -216,6 +241,28 @@ export function renderToPipeableStream( }; } +function createFakeWritableFromWebSocket(webSocket: WebSocket): Writable { + return ({ + write(chunk: string | Uint8Array) { + webSocket.send((chunk: any)); + return true; + }, + end() { + webSocket.close(); + }, + destroy(reason) { + if (typeof reason === 'object' && reason !== null) { + reason = reason.message; + } + if (typeof reason === 'string') { + webSocket.close(1011, reason); + } else { + webSocket.close(1011); + } + }, + }: any); +} + function createFakeWritableFromReadableStreamController( controller: ReadableStreamController, ): Writable { @@ -289,7 +336,7 @@ function startReadingFromDebugChannelReadableStream( export function renderToReadableStream( model: ReactClientValue, options?: Omit & { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, signal?: AbortSignal, }, ): ReadableStream { @@ -297,6 +344,10 @@ export function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, null, @@ -320,6 +371,24 @@ export function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + let debugWritable: Writable; + const debugStream = new ReadableStream( + { + type: 'bytes', + start: (controller): ?Promise => { + debugWritable = + createFakeWritableFromReadableStreamController(controller); + }, + pull: (controller): ?Promise => { + startFlowingDebug(request, debugWritable); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientBrowser.js b/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientBrowser.js index 0a3b6cedc822..a9f31f1b765d 100644 --- a/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientBrowser.js +++ b/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientBrowser.js @@ -19,9 +19,11 @@ import type {ReactServerValue} from 'react-client/src/ReactFlightReplyClient'; import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, + processStringChunk, close, injectIntoDevTools, } from 'react-client/src/ReactFlightClient'; @@ -43,7 +45,7 @@ type CallServerCallback = (string, args: A) => Promise; export type Options = { callServer?: CallServerCallback, - debugChannel?: {writable?: WritableStream, ...}, + debugChannel?: {writable?: WritableStream, readable?: ReadableStream, ...}, temporaryReferences?: TemporaryReferenceSet, findSourceMapURL?: FindSourceMapURLCallback, replayConsoleLogs?: boolean, @@ -95,10 +97,50 @@ function createResponseFromOptions(options: void | Options) { ); } +function startReadingFromUniversalStream( + response: FlightResponse, + stream: ReadableStream, +): void { + // This is the same as startReadingFromStream except this allows WebSocketStreams which + // return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially + // always allow streams with variable chunk types. + const streamState = createStreamState(); + const reader = stream.getReader(); + function progress({ + done, + value, + }: { + done: boolean, + value: any, + ... + }): void | Promise { + if (done) { + close(response); + return; + } + if (value instanceof ArrayBuffer) { + // WebSockets can produce ArrayBuffer values in ReadableStreams. + processBinaryChunk(response, streamState, new Uint8Array(value)); + } else if (typeof value === 'string') { + // WebSockets can produce string values in ReadableStreams. + processStringChunk(response, streamState, value); + } else { + processBinaryChunk(response, streamState, value); + } + return reader.read().then(progress).catch(error); + } + function error(e: any) { + reportGlobalError(response, e); + } + reader.read().then(progress).catch(error); +} + function startReadingFromStream( response: FlightResponse, stream: ReadableStream, + isSecondaryStream: boolean, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -109,11 +151,14 @@ function startReadingFromStream( ... }): void | Promise { if (done) { - close(response); + // If we're the secondary stream, then we don't close the response until the debug channel closes. + if (!isSecondaryStream) { + close(response); + } return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { @@ -127,7 +172,17 @@ function createFromReadableStream( options?: Options, ): Thenable { const response: FlightResponse = createResponseFromOptions(options); - startReadingFromStream(response, stream); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream(response, options.debugChannel.readable); + startReadingFromStream(response, stream, true); + } else { + startReadingFromStream(response, stream, false); + } return getRoot(response); } @@ -138,7 +193,20 @@ function createFromFetch( const response: FlightResponse = createResponseFromOptions(options); promiseForResponse.then( function (r) { - startReadingFromStream(response, (r.body: any)); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream( + response, + options.debugChannel.readable, + ); + startReadingFromStream(response, (r.body: any), true); + } else { + startReadingFromStream(response, (r.body: any), false); + } }, function (e) { reportGlobalError(response, e); diff --git a/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientEdge.js b/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientEdge.js index 48cb0dd4db13..81f3813c433f 100644 --- a/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientEdge.js +++ b/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientEdge.js @@ -30,6 +30,7 @@ type ServerConsumerManifest = { import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, @@ -104,6 +105,7 @@ function startReadingFromStream( response: FlightResponse, stream: ReadableStream, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -118,7 +120,7 @@ function startReadingFromStream( return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { diff --git a/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientNode.js b/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientNode.js index fbdf5b49e7c9..38c26827e4ca 100644 --- a/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientNode.js +++ b/packages/react-server-dom-turbopack/src/client/ReactFlightDOMClientNode.js @@ -30,8 +30,10 @@ import type {Readable} from 'stream'; import { createResponse, + createStreamState, getRoot, reportGlobalError, + processStringChunk, processBinaryChunk, close, } from 'react-client/src/ReactFlightClient'; @@ -80,8 +82,13 @@ function createFromNodeStream( ? options.environmentName : undefined, ); + const streamState = createStreamState(); stream.on('data', chunk => { - processBinaryChunk(response, chunk); + if (typeof chunk === 'string') { + processStringChunk(response, streamState, chunk); + } else { + processBinaryChunk(response, streamState, chunk); + } }); stream.on('error', error => { reportGlobalError(response, error); diff --git a/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerBrowser.js b/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerBrowser.js index 0f09f82b90d7..83add4abd972 100644 --- a/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerBrowser.js +++ b/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerBrowser.js @@ -20,6 +20,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -56,7 +57,7 @@ export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTem export type {TemporaryReferenceSet}; type Options = { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, identifierPrefix?: string, @@ -116,6 +117,10 @@ function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, turbopackMap, @@ -139,6 +144,19 @@ function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + const debugStream = new ReadableStream( + { + type: 'bytes', + pull: (controller): ?Promise => { + startFlowingDebug(request, controller); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerEdge.js b/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerEdge.js index 07ed44059f07..54f3d773318e 100644 --- a/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerEdge.js +++ b/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerEdge.js @@ -22,6 +22,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -61,7 +62,7 @@ export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTem export type {TemporaryReferenceSet}; type Options = { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, identifierPrefix?: string, @@ -121,6 +122,10 @@ function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, turbopackMap, @@ -144,6 +149,19 @@ function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + const debugStream = new ReadableStream( + { + type: 'bytes', + pull: (controller): ?Promise => { + startFlowingDebug(request, controller); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerNode.js b/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerNode.js index 8e18f7190985..4cea9ca149ad 100644 --- a/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerNode.js +++ b/packages/react-server-dom-turbopack/src/server/ReactFlightDOMServerNode.js @@ -29,6 +29,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -145,7 +146,7 @@ function startReadingFromDebugChannelReadable( } type Options = { - debugChannel?: Readable | Duplex | WebSocket, + debugChannel?: Readable | Writable | Duplex | WebSocket, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, onError?: (error: mixed) => void, @@ -165,6 +166,24 @@ function renderToPipeableStream( options?: Options, ): PipeableStream { const debugChannel = __DEV__ && options ? options.debugChannel : undefined; + const debugChannelReadable: void | Readable | WebSocket = + __DEV__ && + debugChannel !== undefined && + // $FlowFixMe[method-unbinding] + (typeof debugChannel.read === 'function' || + typeof debugChannel.readyState === 'number') + ? (debugChannel: any) + : undefined; + const debugChannelWritable: void | Writable = + __DEV__ && debugChannel !== undefined + ? // $FlowFixMe[method-unbinding] + typeof debugChannel.write === 'function' + ? (debugChannel: any) + : // $FlowFixMe[method-unbinding] + typeof debugChannel.send === 'function' + ? createFakeWritableFromWebSocket((debugChannel: any)) + : undefined + : undefined; const request = createRequest( model, turbopackMap, @@ -178,8 +197,11 @@ function renderToPipeableStream( ); let hasStartedFlowing = false; startWork(request); - if (debugChannel !== undefined) { - startReadingFromDebugChannelReadable(request, debugChannel); + if (debugChannelWritable !== undefined) { + startFlowingDebug(request, debugChannelWritable); + } + if (debugChannelReadable !== undefined) { + startReadingFromDebugChannelReadable(request, debugChannelReadable); } return { pipe(destination: T): T { @@ -198,10 +220,13 @@ function renderToPipeableStream( 'The destination stream errored while writing data.', ), ); - destination.on( - 'close', - createCancelHandler(request, 'The destination stream closed early.'), - ); + // We don't close until the debug channel closes. + if (!__DEV__ || debugChannelReadable === undefined) { + destination.on( + 'close', + createCancelHandler(request, 'The destination stream closed early.'), + ); + } return destination; }, abort(reason: mixed) { @@ -210,6 +235,28 @@ function renderToPipeableStream( }; } +function createFakeWritableFromWebSocket(webSocket: WebSocket): Writable { + return ({ + write(chunk: string | Uint8Array) { + webSocket.send((chunk: any)); + return true; + }, + end() { + webSocket.close(); + }, + destroy(reason) { + if (typeof reason === 'object' && reason !== null) { + reason = reason.message; + } + if (typeof reason === 'string') { + webSocket.close(1011, reason); + } else { + webSocket.close(1011); + } + }, + }: any); +} + function createFakeWritableFromReadableStreamController( controller: ReadableStreamController, ): Writable { @@ -284,7 +331,7 @@ function renderToReadableStream( model: ReactClientValue, turbopackMap: ClientManifest, options?: Omit & { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, signal?: AbortSignal, }, ): ReadableStream { @@ -292,6 +339,10 @@ function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, turbopackMap, @@ -315,6 +366,24 @@ function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + let debugWritable: Writable; + const debugStream = new ReadableStream( + { + type: 'bytes', + start: (controller): ?Promise => { + debugWritable = + createFakeWritableFromReadableStreamController(controller); + }, + pull: (controller): ?Promise => { + startFlowingDebug(request, debugWritable); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientBrowser.js b/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientBrowser.js index 0a3b6cedc822..a9f31f1b765d 100644 --- a/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientBrowser.js +++ b/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientBrowser.js @@ -19,9 +19,11 @@ import type {ReactServerValue} from 'react-client/src/ReactFlightReplyClient'; import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, + processStringChunk, close, injectIntoDevTools, } from 'react-client/src/ReactFlightClient'; @@ -43,7 +45,7 @@ type CallServerCallback = (string, args: A) => Promise; export type Options = { callServer?: CallServerCallback, - debugChannel?: {writable?: WritableStream, ...}, + debugChannel?: {writable?: WritableStream, readable?: ReadableStream, ...}, temporaryReferences?: TemporaryReferenceSet, findSourceMapURL?: FindSourceMapURLCallback, replayConsoleLogs?: boolean, @@ -95,10 +97,50 @@ function createResponseFromOptions(options: void | Options) { ); } +function startReadingFromUniversalStream( + response: FlightResponse, + stream: ReadableStream, +): void { + // This is the same as startReadingFromStream except this allows WebSocketStreams which + // return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially + // always allow streams with variable chunk types. + const streamState = createStreamState(); + const reader = stream.getReader(); + function progress({ + done, + value, + }: { + done: boolean, + value: any, + ... + }): void | Promise { + if (done) { + close(response); + return; + } + if (value instanceof ArrayBuffer) { + // WebSockets can produce ArrayBuffer values in ReadableStreams. + processBinaryChunk(response, streamState, new Uint8Array(value)); + } else if (typeof value === 'string') { + // WebSockets can produce string values in ReadableStreams. + processStringChunk(response, streamState, value); + } else { + processBinaryChunk(response, streamState, value); + } + return reader.read().then(progress).catch(error); + } + function error(e: any) { + reportGlobalError(response, e); + } + reader.read().then(progress).catch(error); +} + function startReadingFromStream( response: FlightResponse, stream: ReadableStream, + isSecondaryStream: boolean, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -109,11 +151,14 @@ function startReadingFromStream( ... }): void | Promise { if (done) { - close(response); + // If we're the secondary stream, then we don't close the response until the debug channel closes. + if (!isSecondaryStream) { + close(response); + } return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { @@ -127,7 +172,17 @@ function createFromReadableStream( options?: Options, ): Thenable { const response: FlightResponse = createResponseFromOptions(options); - startReadingFromStream(response, stream); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream(response, options.debugChannel.readable); + startReadingFromStream(response, stream, true); + } else { + startReadingFromStream(response, stream, false); + } return getRoot(response); } @@ -138,7 +193,20 @@ function createFromFetch( const response: FlightResponse = createResponseFromOptions(options); promiseForResponse.then( function (r) { - startReadingFromStream(response, (r.body: any)); + if ( + __DEV__ && + options && + options.debugChannel && + options.debugChannel.readable + ) { + startReadingFromUniversalStream( + response, + options.debugChannel.readable, + ); + startReadingFromStream(response, (r.body: any), true); + } else { + startReadingFromStream(response, (r.body: any), false); + } }, function (e) { reportGlobalError(response, e); diff --git a/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientEdge.js b/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientEdge.js index 48cb0dd4db13..81f3813c433f 100644 --- a/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientEdge.js +++ b/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientEdge.js @@ -30,6 +30,7 @@ type ServerConsumerManifest = { import { createResponse, + createStreamState, getRoot, reportGlobalError, processBinaryChunk, @@ -104,6 +105,7 @@ function startReadingFromStream( response: FlightResponse, stream: ReadableStream, ): void { + const streamState = createStreamState(); const reader = stream.getReader(); function progress({ done, @@ -118,7 +120,7 @@ function startReadingFromStream( return; } const buffer: Uint8Array = (value: any); - processBinaryChunk(response, buffer); + processBinaryChunk(response, streamState, buffer); return reader.read().then(progress).catch(error); } function error(e: any) { diff --git a/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientNode.js b/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientNode.js index b5d59ceace2d..38c26827e4ca 100644 --- a/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientNode.js +++ b/packages/react-server-dom-webpack/src/client/ReactFlightDOMClientNode.js @@ -30,6 +30,7 @@ import type {Readable} from 'stream'; import { createResponse, + createStreamState, getRoot, reportGlobalError, processStringChunk, @@ -81,11 +82,12 @@ function createFromNodeStream( ? options.environmentName : undefined, ); + const streamState = createStreamState(); stream.on('data', chunk => { if (typeof chunk === 'string') { - processStringChunk(response, chunk); + processStringChunk(response, streamState, chunk); } else { - processBinaryChunk(response, chunk); + processBinaryChunk(response, streamState, chunk); } }); stream.on('error', error => { diff --git a/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerBrowser.js b/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerBrowser.js index e2576eafecc1..06f837d431b4 100644 --- a/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerBrowser.js +++ b/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerBrowser.js @@ -20,6 +20,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -56,7 +57,7 @@ export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTem export type {TemporaryReferenceSet}; type Options = { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, identifierPrefix?: string, @@ -116,6 +117,10 @@ function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, webpackMap, @@ -139,6 +144,19 @@ function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + const debugStream = new ReadableStream( + { + type: 'bytes', + pull: (controller): ?Promise => { + startFlowingDebug(request, controller); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerEdge.js b/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerEdge.js index e871cfb9e9ed..806a08700054 100644 --- a/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerEdge.js +++ b/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerEdge.js @@ -22,6 +22,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -61,7 +62,7 @@ export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTem export type {TemporaryReferenceSet}; type Options = { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, identifierPrefix?: string, @@ -121,6 +122,10 @@ function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, webpackMap, @@ -144,6 +149,19 @@ function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + const debugStream = new ReadableStream( + { + type: 'bytes', + pull: (controller): ?Promise => { + startFlowingDebug(request, controller); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerNode.js b/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerNode.js index 7bac80292fe5..0b6669a3ba18 100644 --- a/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerNode.js +++ b/packages/react-server-dom-webpack/src/server/ReactFlightDOMServerNode.js @@ -29,6 +29,7 @@ import { createPrerenderRequest, startWork, startFlowing, + startFlowingDebug, stopFlowing, abort, resolveDebugMessage, @@ -145,7 +146,7 @@ function startReadingFromDebugChannelReadable( } type Options = { - debugChannel?: Readable | Duplex | WebSocket, + debugChannel?: Readable | Writable | Duplex | WebSocket, environmentName?: string | (() => string), filterStackFrame?: (url: string, functionName: string) => boolean, onError?: (error: mixed) => void, @@ -165,6 +166,24 @@ function renderToPipeableStream( options?: Options, ): PipeableStream { const debugChannel = __DEV__ && options ? options.debugChannel : undefined; + const debugChannelReadable: void | Readable | WebSocket = + __DEV__ && + debugChannel !== undefined && + // $FlowFixMe[method-unbinding] + (typeof debugChannel.read === 'function' || + typeof debugChannel.readyState === 'number') + ? (debugChannel: any) + : undefined; + const debugChannelWritable: void | Writable = + __DEV__ && debugChannel !== undefined + ? // $FlowFixMe[method-unbinding] + typeof debugChannel.write === 'function' + ? (debugChannel: any) + : // $FlowFixMe[method-unbinding] + typeof debugChannel.send === 'function' + ? createFakeWritableFromWebSocket((debugChannel: any)) + : undefined + : undefined; const request = createRequest( model, webpackMap, @@ -174,12 +193,15 @@ function renderToPipeableStream( options ? options.temporaryReferences : undefined, __DEV__ && options ? options.environmentName : undefined, __DEV__ && options ? options.filterStackFrame : undefined, - debugChannel !== undefined, + debugChannelReadable !== undefined, ); let hasStartedFlowing = false; startWork(request); - if (debugChannel !== undefined) { - startReadingFromDebugChannelReadable(request, debugChannel); + if (debugChannelWritable !== undefined) { + startFlowingDebug(request, debugChannelWritable); + } + if (debugChannelReadable !== undefined) { + startReadingFromDebugChannelReadable(request, debugChannelReadable); } return { pipe(destination: T): T { @@ -198,10 +220,13 @@ function renderToPipeableStream( 'The destination stream errored while writing data.', ), ); - destination.on( - 'close', - createCancelHandler(request, 'The destination stream closed early.'), - ); + // We don't close until the debug channel closes. + if (!__DEV__ || debugChannelReadable === undefined) { + destination.on( + 'close', + createCancelHandler(request, 'The destination stream closed early.'), + ); + } return destination; }, abort(reason: mixed) { @@ -210,6 +235,28 @@ function renderToPipeableStream( }; } +function createFakeWritableFromWebSocket(webSocket: WebSocket): Writable { + return ({ + write(chunk: string | Uint8Array) { + webSocket.send((chunk: any)); + return true; + }, + end() { + webSocket.close(); + }, + destroy(reason) { + if (typeof reason === 'object' && reason !== null) { + reason = reason.message; + } + if (typeof reason === 'string') { + webSocket.close(1011, reason); + } else { + webSocket.close(1011); + } + }, + }: any); +} + function createFakeWritableFromReadableStreamController( controller: ReadableStreamController, ): Writable { @@ -284,7 +331,7 @@ function renderToReadableStream( model: ReactClientValue, webpackMap: ClientManifest, options?: Omit & { - debugChannel?: {readable?: ReadableStream, ...}, + debugChannel?: {readable?: ReadableStream, writable?: WritableStream, ...}, signal?: AbortSignal, }, ): ReadableStream { @@ -292,6 +339,10 @@ function renderToReadableStream( __DEV__ && options && options.debugChannel ? options.debugChannel.readable : undefined; + const debugChannelWritable = + __DEV__ && options && options.debugChannel + ? options.debugChannel.writable + : undefined; const request = createRequest( model, webpackMap, @@ -315,6 +366,24 @@ function renderToReadableStream( signal.addEventListener('abort', listener); } } + if (debugChannelWritable !== undefined) { + let debugWritable: Writable; + const debugStream = new ReadableStream( + { + type: 'bytes', + start: (controller): ?Promise => { + debugWritable = + createFakeWritableFromReadableStreamController(controller); + }, + pull: (controller): ?Promise => { + startFlowingDebug(request, debugWritable); + }, + }, + // $FlowFixMe[prop-missing] size() methods are not allowed on byte streams. + {highWaterMark: 0}, + ); + debugStream.pipeTo(debugChannelWritable); + } if (debugChannelReadable !== undefined) { startReadingFromDebugChannelReadableStream(request, debugChannelReadable); } diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 1b31704919df..0a820b30726a 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -519,6 +519,7 @@ export type Request = { // DEV-only pendingDebugChunks: number, completedDebugChunks: Array, + debugDestination: null | Destination, environmentName: () => string, filterStackFrame: ( url: string, @@ -639,6 +640,7 @@ function RequestInstance( if (__DEV__) { this.pendingDebugChunks = 0; this.completedDebugChunks = ([]: Array); + this.debugDestination = null; this.environmentName = environmentName === undefined ? () => 'Server' @@ -1519,7 +1521,7 @@ function renderFunctionComponent( const componentName = (Component: any).displayName || Component.name || ''; const componentEnv = (0, request.environmentName)(); - request.pendingDebugChunks++; + request.pendingChunks++; componentDebugInfo = ({ name: componentName, env: componentEnv, @@ -2274,7 +2276,7 @@ function visitAsyncNode( const env = (0, request.environmentName)(); advanceTaskTime(request, task, startTime); // Then emit a reference to us awaiting it in the current task. - request.pendingDebugChunks++; + request.pendingChunks++; emitDebugChunk(request, task.id, { awaited: ((ioNode: any): ReactIOInfo), // This is deduped by this reference. env: env, @@ -2334,7 +2336,7 @@ function emitAsyncSequence( } else if (awaitedNode !== null) { // Nothing in user space (unfiltered stack) awaited this. serializeIONode(request, awaitedNode, awaitedNode.promise); - request.pendingDebugChunks++; + request.pendingChunks++; // We log the environment at the time when we ping which may be later than what the // environment was when we actually started awaiting. const env = (0, request.environmentName)(); @@ -4021,9 +4023,19 @@ function emitDebugChunk( } const json: string = serializeDebugModel(request, 500, debugInfo); - const row = serializeRowHeader('D', id) + json + '\n'; - const processedChunk = stringToChunk(row); - request.completedDebugChunks.push(processedChunk); + if (request.debugDestination !== null) { + // Outline the actual timing information to the debug channel. + const outlinedId = request.nextChunkId++; + const debugRow = outlinedId.toString(16) + ':' + json + '\n'; + request.pendingDebugChunks++; + request.completedDebugChunks.push(stringToChunk(debugRow)); + const row = + serializeRowHeader('D', id) + '"$' + outlinedId.toString(16) + '"\n'; + request.completedRegularChunks.push(stringToChunk(row)); + } else { + const row = serializeRowHeader('D', id) + json + '\n'; + request.completedRegularChunks.push(stringToChunk(row)); + } } function outlineComponentInfo( @@ -4941,7 +4953,7 @@ function forwardDebugInfo( // being no references to this as an owner. outlineComponentInfo(request, (info: any)); // Emit a reference to the outlined one. - request.pendingDebugChunks++; + request.pendingChunks++; emitDebugChunk(request, id, info); } else if (info.awaited) { const ioInfo = info.awaited; @@ -4982,11 +4994,11 @@ function forwardDebugInfo( // $FlowFixMe[cannot-write] debugAsyncInfo.stack = debugStack; } - request.pendingDebugChunks++; + request.pendingChunks++; emitDebugChunk(request, id, debugAsyncInfo); } } else { - request.pendingDebugChunks++; + request.pendingChunks++; emitDebugChunk(request, id, info); } } @@ -5088,7 +5100,7 @@ function forwardDebugInfoFromAbortedTask(request: Request, task: Task): void { // complete in time before aborting. // The best we can do is try to emit the stack of where this Promise was created. serializeIONode(request, node, null); - request.pendingDebugChunks++; + request.pendingChunks++; const env = (0, request.environmentName)(); const asyncInfo: ReactAsyncInfo = { awaited: ((node: any): ReactIOInfo), // This is deduped by this reference. @@ -5117,13 +5129,22 @@ function emitTimingChunk( if (!enableProfilerTimer || !enableComponentPerformanceTrack) { return; } - request.pendingDebugChunks++; + request.pendingChunks++; const relativeTimestamp = timestamp - request.timeOrigin; - const row = - serializeRowHeader('D', id) + '{"time":' + relativeTimestamp + '}\n'; - const processedChunk = stringToChunk(row); - // TODO: Move to its own priority queue. - request.completedDebugChunks.push(processedChunk); + const json = '{"time":' + relativeTimestamp + '}'; + if (request.debugDestination !== null) { + // Outline the actual timing information to the debug channel. + const outlinedId = request.nextChunkId++; + const debugRow = outlinedId.toString(16) + ':' + json + '\n'; + request.pendingDebugChunks++; + request.completedDebugChunks.push(stringToChunk(debugRow)); + const row = + serializeRowHeader('D', id) + '"$' + outlinedId.toString(16) + '"\n'; + request.completedRegularChunks.push(stringToChunk(row)); + } else { + const row = serializeRowHeader('D', id) + json + '\n'; + request.completedRegularChunks.push(stringToChunk(row)); + } } function advanceTaskTime( @@ -5329,7 +5350,7 @@ function retryTask(request: Request, task: Task): void { if (__DEV__) { const currentEnv = (0, request.environmentName)(); if (currentEnv !== task.environmentName) { - request.pendingDebugChunks++; + request.pendingChunks++; // The environment changed since we last emitted any debug information for this // task. We emit an entry that just includes the environment name change. emitDebugChunk(request, task.id, {env: currentEnv}); @@ -5444,9 +5465,7 @@ function performWork(request: Request): void { const task = pingedTasks[i]; retryTask(request, task); } - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + flushCompletedChunks(request); } catch (error) { logRecoverableError(request, error, null); fatalError(request, error); @@ -5507,50 +5526,49 @@ function finishHaltedTask(task: Task, request: Request): void { request.pendingChunks--; } -function flushCompletedChunks( - request: Request, - destination: Destination, -): void { - beginWriting(destination); - try { - // We emit module chunks first in the stream so that - // they can be preloaded as early as possible. - const importsChunks = request.completedImportChunks; - let i = 0; - for (; i < importsChunks.length; i++) { - request.pendingChunks--; - const chunk = importsChunks[i]; - const keepWriting: boolean = writeChunkAndReturn(destination, chunk); - if (!keepWriting) { - request.destination = null; - i++; - break; +function flushCompletedChunks(request: Request): void { + if (__DEV__ && request.debugDestination !== null) { + const debugDestination = request.debugDestination; + beginWriting(debugDestination); + try { + const debugChunks = request.completedDebugChunks; + let i = 0; + for (; i < debugChunks.length; i++) { + request.pendingDebugChunks--; + const chunk = debugChunks[i]; + writeChunkAndReturn(debugDestination, chunk); } + debugChunks.splice(0, i); + } finally { + completeWriting(debugDestination); } - importsChunks.splice(0, i); - - // Next comes hints. - const hintChunks = request.completedHintChunks; - i = 0; - for (; i < hintChunks.length; i++) { - const chunk = hintChunks[i]; - const keepWriting: boolean = writeChunkAndReturn(destination, chunk); - if (!keepWriting) { - request.destination = null; - i++; - break; + flushBuffered(debugDestination); + } + const destination = request.destination; + if (destination !== null) { + beginWriting(destination); + try { + // We emit module chunks first in the stream so that + // they can be preloaded as early as possible. + const importsChunks = request.completedImportChunks; + let i = 0; + for (; i < importsChunks.length; i++) { + request.pendingChunks--; + const chunk = importsChunks[i]; + const keepWriting: boolean = writeChunkAndReturn(destination, chunk); + if (!keepWriting) { + request.destination = null; + i++; + break; + } } - } - hintChunks.splice(0, i); + importsChunks.splice(0, i); - // Debug meta data comes before the model data because it will often end up blocking the model from - // completing since the JSX will reference the debug data. - if (__DEV__) { - const debugChunks = request.completedDebugChunks; + // Next comes hints. + const hintChunks = request.completedHintChunks; i = 0; - for (; i < debugChunks.length; i++) { - request.pendingDebugChunks--; - const chunk = debugChunks[i]; + for (; i < hintChunks.length; i++) { + const chunk = hintChunks[i]; const keepWriting: boolean = writeChunkAndReturn(destination, chunk); if (!keepWriting) { request.destination = null; @@ -5558,49 +5576,89 @@ function flushCompletedChunks( break; } } - debugChunks.splice(0, i); - } + hintChunks.splice(0, i); + + // Debug meta data comes before the model data because it will often end up blocking the model from + // completing since the JSX will reference the debug data. + if (__DEV__ && request.debugDestination === null) { + const debugChunks = request.completedDebugChunks; + i = 0; + for (; i < debugChunks.length; i++) { + request.pendingDebugChunks--; + const chunk = debugChunks[i]; + const keepWriting: boolean = writeChunkAndReturn(destination, chunk); + if (!keepWriting) { + request.destination = null; + i++; + break; + } + } + debugChunks.splice(0, i); + } - // Next comes model data. - const regularChunks = request.completedRegularChunks; - i = 0; - for (; i < regularChunks.length; i++) { - request.pendingChunks--; - const chunk = regularChunks[i]; - const keepWriting: boolean = writeChunkAndReturn(destination, chunk); - if (!keepWriting) { - request.destination = null; - i++; - break; + // Next comes model data. + const regularChunks = request.completedRegularChunks; + i = 0; + for (; i < regularChunks.length; i++) { + request.pendingChunks--; + const chunk = regularChunks[i]; + const keepWriting: boolean = writeChunkAndReturn(destination, chunk); + if (!keepWriting) { + request.destination = null; + i++; + break; + } } - } - regularChunks.splice(0, i); - - // Finally, errors are sent. The idea is that it's ok to delay - // any error messages and prioritize display of other parts of - // the page. - const errorChunks = request.completedErrorChunks; - i = 0; - for (; i < errorChunks.length; i++) { - request.pendingChunks--; - const chunk = errorChunks[i]; - const keepWriting: boolean = writeChunkAndReturn(destination, chunk); - if (!keepWriting) { - request.destination = null; - i++; - break; + regularChunks.splice(0, i); + + // Finally, errors are sent. The idea is that it's ok to delay + // any error messages and prioritize display of other parts of + // the page. + const errorChunks = request.completedErrorChunks; + i = 0; + for (; i < errorChunks.length; i++) { + request.pendingChunks--; + const chunk = errorChunks[i]; + const keepWriting: boolean = writeChunkAndReturn(destination, chunk); + if (!keepWriting) { + request.destination = null; + i++; + break; + } } + errorChunks.splice(0, i); + } finally { + request.flushScheduled = false; + completeWriting(destination); } - errorChunks.splice(0, i); - } finally { - request.flushScheduled = false; - completeWriting(destination); + flushBuffered(destination); } - flushBuffered(destination); - if ( - request.pendingChunks === 0 && - (!__DEV__ || request.pendingDebugChunks === 0) - ) { + if (request.pendingChunks === 0) { + if (__DEV__) { + const debugDestination = request.debugDestination; + if (request.pendingDebugChunks === 0) { + // Continue fully closing both streams. + if (debugDestination !== null) { + close(debugDestination); + request.debugDestination = null; + } + } else { + // We still have debug information to write. + if (debugDestination === null) { + // We'll continue writing on this stream so nothing closes. + return; + } else { + // We'll close the main stream but keep the debug stream open. + // TODO: If this destination is not currently flowing we'll not close it when it resumes flowing. + // We should keep a separate status for this. + if (request.destination !== null) { + close(request.destination); + request.destination = null; + } + return; + } + } + } // We're done. if (enableTaint) { cleanupTaintQueue(request); @@ -5612,8 +5670,14 @@ function flushCompletedChunks( request.cacheController.abort(abortReason); } request.status = CLOSED; - close(destination); - request.destination = null; + if (request.destination !== null) { + close(request.destination); + request.destination = null; + } + if (__DEV__ && request.debugDestination !== null) { + close(request.debugDestination); + request.debugDestination = null; + } } } @@ -5640,17 +5704,15 @@ function enqueueFlush(request: Request): void { request.pingedTasks.length === 0 && // If there is no destination there is nothing we can flush to. A flush will // happen when we start flowing again - request.destination !== null + (request.destination !== null || + (__DEV__ && request.debugDestination !== null)) ) { request.flushScheduled = true; // Unlike startWork and pingTask we intetionally use scheduleWork // here even during prerenders to allow as much batching as possible scheduleWork(() => { request.flushScheduled = false; - const destination = request.destination; - if (destination) { - flushCompletedChunks(request, destination); - } + flushCompletedChunks(request); }); } } @@ -5677,7 +5739,32 @@ export function startFlowing(request: Request, destination: Destination): void { } request.destination = destination; try { - flushCompletedChunks(request, destination); + flushCompletedChunks(request); + } catch (error) { + logRecoverableError(request, error, null); + fatalError(request, error); + } +} + +export function startFlowingDebug( + request: Request, + debugDestination: Destination, +): void { + if (request.status === CLOSING) { + request.status = CLOSED; + closeWithError(debugDestination, request.fatalError); + return; + } + if (request.status === CLOSED) { + return; + } + if (request.debugDestination !== null) { + // We're already flowing. + return; + } + request.debugDestination = debugDestination; + try { + flushCompletedChunks(request); } catch (error) { logRecoverableError(request, error, null); fatalError(request, error); @@ -5693,9 +5780,7 @@ function finishHalt(request: Request, abortedTasks: Set): void { abortedTasks.forEach(task => finishHaltedTask(task, request)); const onAllReady = request.onAllReady; onAllReady(); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + flushCompletedChunks(request); } catch (error) { logRecoverableError(request, error, null); fatalError(request, error); @@ -5711,9 +5796,7 @@ function finishAbort( abortedTasks.forEach(task => finishAbortedTask(task, request, errorId)); const onAllReady = request.onAllReady; onAllReady(); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + flushCompletedChunks(request); } catch (error) { logRecoverableError(request, error, null); fatalError(request, error); @@ -5780,9 +5863,7 @@ export function abort(request: Request, reason: mixed): void { } else { const onAllReady = request.onAllReady; onAllReady(); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + flushCompletedChunks(request); } } catch (error) { logRecoverableError(request, error, null);