Skip to content

Commit eb7f8b4

Browse files
authored
[Flight] Add Separate Outgoing Debug Channel (facebook#33754)
This lets us pass a writable on the server side and readable on the client side to send debug info through a separate channel so that it doesn't interfere with the main payload as much. The main payload refers to chunks defined in the debug info which means it's still blocked on it though. This ensures that the debug data has loaded by the time the value is rendered so that the next step can forward the data. This will be a bit fragile to race conditions until facebook#33665 lands. Another follow up needed is the ability to skip the debug channel on the receiving side. Right now it'll block forever if you don't provide one since we're blocking on the debug data.
1 parent eed2560 commit eb7f8b4

25 files changed

+997
-240
lines changed

packages/react-client/src/ReactFlightClient.js

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,6 @@ type Response = {
342342
_chunks: Map<number, SomeChunk<any>>,
343343
_fromJSON: (key: string, value: JSONValue) => any,
344344
_stringDecoder: StringDecoder,
345-
_rowState: RowParserState,
346-
_rowID: number, // parts of a row ID parsed so far
347-
_rowTag: number, // 0 indicates that we're currently parsing the row ID
348-
_rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline.
349-
_buffer: Array<Uint8Array>, // chunks received so far as part of this row
350345
_closed: boolean,
351346
_closedReason: mixed,
352347
_tempRefs: void | TemporaryReferenceSet, // the set temporary references can be resolved from
@@ -2154,11 +2149,6 @@ function ResponseInstance(
21542149
this._chunks = chunks;
21552150
this._stringDecoder = createStringDecoder();
21562151
this._fromJSON = (null: any);
2157-
this._rowState = 0;
2158-
this._rowID = 0;
2159-
this._rowTag = 0;
2160-
this._rowLength = 0;
2161-
this._buffer = [];
21622152
this._closed = false;
21632153
this._closedReason = null;
21642154
this._tempRefs = temporaryReferences;
@@ -2259,6 +2249,24 @@ export function createResponse(
22592249
);
22602250
}
22612251

2252+
export type StreamState = {
2253+
_rowState: RowParserState,
2254+
_rowID: number, // parts of a row ID parsed so far
2255+
_rowTag: number, // 0 indicates that we're currently parsing the row ID
2256+
_rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline.
2257+
_buffer: Array<Uint8Array>, // chunks received so far as part of this row
2258+
};
2259+
2260+
export function createStreamState(): StreamState {
2261+
return {
2262+
_rowState: 0,
2263+
_rowID: 0,
2264+
_rowTag: 0,
2265+
_rowLength: 0,
2266+
_buffer: [],
2267+
};
2268+
}
2269+
22622270
function resolveDebugHalt(response: Response, id: number): void {
22632271
const chunks = response._chunks;
22642272
let chunk = chunks.get(id);
@@ -3995,6 +4003,7 @@ function processFullStringRow(
39954003

39964004
export function processBinaryChunk(
39974005
weakResponse: WeakResponse,
4006+
streamState: StreamState,
39984007
chunk: Uint8Array,
39994008
): void {
40004009
if (hasGCedResponse(weakResponse)) {
@@ -4003,11 +4012,11 @@ export function processBinaryChunk(
40034012
}
40044013
const response = unwrapWeakResponse(weakResponse);
40054014
let i = 0;
4006-
let rowState = response._rowState;
4007-
let rowID = response._rowID;
4008-
let rowTag = response._rowTag;
4009-
let rowLength = response._rowLength;
4010-
const buffer = response._buffer;
4015+
let rowState = streamState._rowState;
4016+
let rowID = streamState._rowID;
4017+
let rowTag = streamState._rowTag;
4018+
let rowLength = streamState._rowLength;
4019+
const buffer = streamState._buffer;
40114020
const chunkLength = chunk.length;
40124021
while (i < chunkLength) {
40134022
let lastIdx = -1;
@@ -4112,14 +4121,15 @@ export function processBinaryChunk(
41124121
break;
41134122
}
41144123
}
4115-
response._rowState = rowState;
4116-
response._rowID = rowID;
4117-
response._rowTag = rowTag;
4118-
response._rowLength = rowLength;
4124+
streamState._rowState = rowState;
4125+
streamState._rowID = rowID;
4126+
streamState._rowTag = rowTag;
4127+
streamState._rowLength = rowLength;
41194128
}
41204129

41214130
export function processStringChunk(
41224131
weakResponse: WeakResponse,
4132+
streamState: StreamState,
41234133
chunk: string,
41244134
): void {
41254135
if (hasGCedResponse(weakResponse)) {
@@ -4136,11 +4146,11 @@ export function processStringChunk(
41364146
// here. Basically, only if Flight Server gave you this string as a chunk,
41374147
// you can use it here.
41384148
let i = 0;
4139-
let rowState = response._rowState;
4140-
let rowID = response._rowID;
4141-
let rowTag = response._rowTag;
4142-
let rowLength = response._rowLength;
4143-
const buffer = response._buffer;
4149+
let rowState = streamState._rowState;
4150+
let rowID = streamState._rowID;
4151+
let rowTag = streamState._rowTag;
4152+
let rowLength = streamState._rowLength;
4153+
const buffer = streamState._buffer;
41444154
const chunkLength = chunk.length;
41454155
while (i < chunkLength) {
41464156
let lastIdx = -1;
@@ -4264,10 +4274,10 @@ export function processStringChunk(
42644274
);
42654275
}
42664276
}
4267-
response._rowState = rowState;
4268-
response._rowID = rowID;
4269-
response._rowTag = rowTag;
4270-
response._rowLength = rowLength;
4277+
streamState._rowState = rowState;
4278+
streamState._rowID = rowID;
4279+
streamState._rowTag = rowTag;
4280+
streamState._rowLength = rowLength;
42714281
}
42724282

42734283
function parseModel<T>(response: Response, json: UninitializedModel): T {

packages/react-markup/src/ReactMarkupServer.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525

2626
import {
2727
createResponse as createFlightResponse,
28+
createStreamState as createFlightStreamState,
2829
getRoot as getFlightRoot,
2930
processStringChunk as processFlightStringChunk,
3031
close as closeFlight,
@@ -80,10 +81,11 @@ export function experimental_renderToHTML(
8081
options?: MarkupOptions,
8182
): Promise<string> {
8283
return new Promise((resolve, reject) => {
84+
const streamState = createFlightStreamState();
8385
const flightDestination = {
8486
push(chunk: string | null): boolean {
8587
if (chunk !== null) {
86-
processFlightStringChunk(flightResponse, chunk);
88+
processFlightStringChunk(flightResponse, streamState, chunk);
8789
} else {
8890
closeFlight(flightResponse);
8991
}

packages/react-noop-renderer/src/ReactNoopFlightClient.js

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,35 +24,36 @@ type Source = Array<Uint8Array>;
2424

2525
const decoderOptions = {stream: true};
2626

27-
const {createResponse, processBinaryChunk, getRoot, close} = ReactFlightClient({
28-
createStringDecoder() {
29-
return new TextDecoder();
30-
},
31-
readPartialStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
32-
return decoder.decode(buffer, decoderOptions);
33-
},
34-
readFinalStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
35-
return decoder.decode(buffer);
36-
},
37-
resolveClientReference(bundlerConfig: null, idx: string) {
38-
return idx;
39-
},
40-
prepareDestinationForModule(moduleLoading: null, metadata: string) {},
41-
preloadModule(idx: string) {},
42-
requireModule(idx: string) {
43-
return readModule(idx);
44-
},
45-
parseModel(response: Response, json) {
46-
return JSON.parse(json, response._fromJSON);
47-
},
48-
bindToConsole(methodName, args, badgeName) {
49-
return Function.prototype.bind.apply(
50-
// eslint-disable-next-line react-internal/no-production-logging
51-
console[methodName],
52-
[console].concat(args),
53-
);
54-
},
55-
});
27+
const {createResponse, createStreamState, processBinaryChunk, getRoot, close} =
28+
ReactFlightClient({
29+
createStringDecoder() {
30+
return new TextDecoder();
31+
},
32+
readPartialStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
33+
return decoder.decode(buffer, decoderOptions);
34+
},
35+
readFinalStringChunk(decoder: TextDecoder, buffer: Uint8Array): string {
36+
return decoder.decode(buffer);
37+
},
38+
resolveClientReference(bundlerConfig: null, idx: string) {
39+
return idx;
40+
},
41+
prepareDestinationForModule(moduleLoading: null, metadata: string) {},
42+
preloadModule(idx: string) {},
43+
requireModule(idx: string) {
44+
return readModule(idx);
45+
},
46+
parseModel(response: Response, json) {
47+
return JSON.parse(json, response._fromJSON);
48+
},
49+
bindToConsole(methodName, args, badgeName) {
50+
return Function.prototype.bind.apply(
51+
// eslint-disable-next-line react-internal/no-production-logging
52+
console[methodName],
53+
[console].concat(args),
54+
);
55+
},
56+
});
5657

5758
type ReadOptions = {|
5859
findSourceMapURL?: FindSourceMapURLCallback,
@@ -76,8 +77,9 @@ function read<T>(source: Source, options: ReadOptions): Thenable<T> {
7677
? options.debugChannel.onMessage
7778
: undefined,
7879
);
80+
const streamState = createStreamState();
7981
for (let i = 0; i < source.length; i++) {
80-
processBinaryChunk(response, source[i], 0);
82+
processBinaryChunk(response, streamState, source[i], 0);
8183
}
8284
if (options !== undefined && options.close) {
8385
close(response);

packages/react-server-dom-esm/src/client/ReactFlightDOMClientBrowser.js

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import type {ReactServerValue} from 'react-client/src/ReactFlightReplyClient';
1919

2020
import {
2121
createResponse,
22+
createStreamState,
2223
getRoot,
2324
reportGlobalError,
2425
processBinaryChunk,
26+
processStringChunk,
2527
close,
2628
injectIntoDevTools,
2729
} from 'react-client/src/ReactFlightClient';
@@ -44,7 +46,7 @@ type CallServerCallback = <A, T>(string, args: A) => Promise<T>;
4446
export type Options = {
4547
moduleBaseURL?: string,
4648
callServer?: CallServerCallback,
47-
debugChannel?: {writable?: WritableStream, ...},
49+
debugChannel?: {writable?: WritableStream, readable?: ReadableStream, ...},
4850
temporaryReferences?: TemporaryReferenceSet,
4951
findSourceMapURL?: FindSourceMapURLCallback,
5052
replayConsoleLogs?: boolean,
@@ -96,10 +98,50 @@ function createResponseFromOptions(options: void | Options) {
9698
);
9799
}
98100

101+
function startReadingFromUniversalStream(
102+
response: FlightResponse,
103+
stream: ReadableStream,
104+
): void {
105+
// This is the same as startReadingFromStream except this allows WebSocketStreams which
106+
// return ArrayBuffer and string chunks instead of Uint8Array chunks. We could potentially
107+
// always allow streams with variable chunk types.
108+
const streamState = createStreamState();
109+
const reader = stream.getReader();
110+
function progress({
111+
done,
112+
value,
113+
}: {
114+
done: boolean,
115+
value: any,
116+
...
117+
}): void | Promise<void> {
118+
if (done) {
119+
close(response);
120+
return;
121+
}
122+
if (value instanceof ArrayBuffer) {
123+
// WebSockets can produce ArrayBuffer values in ReadableStreams.
124+
processBinaryChunk(response, streamState, new Uint8Array(value));
125+
} else if (typeof value === 'string') {
126+
// WebSockets can produce string values in ReadableStreams.
127+
processStringChunk(response, streamState, value);
128+
} else {
129+
processBinaryChunk(response, streamState, value);
130+
}
131+
return reader.read().then(progress).catch(error);
132+
}
133+
function error(e: any) {
134+
reportGlobalError(response, e);
135+
}
136+
reader.read().then(progress).catch(error);
137+
}
138+
99139
function startReadingFromStream(
100140
response: FlightResponse,
101141
stream: ReadableStream,
142+
isSecondaryStream: boolean,
102143
): void {
144+
const streamState = createStreamState();
103145
const reader = stream.getReader();
104146
function progress({
105147
done,
@@ -110,25 +152,37 @@ function startReadingFromStream(
110152
...
111153
}): void | Promise<void> {
112154
if (done) {
113-
close(response);
155+
// If we're the secondary stream, then we don't close the response until the debug channel closes.
156+
if (!isSecondaryStream) {
157+
close(response);
158+
}
114159
return;
115160
}
116161
const buffer: Uint8Array = (value: any);
117-
processBinaryChunk(response, buffer);
162+
processBinaryChunk(response, streamState, buffer);
118163
return reader.read().then(progress).catch(error);
119164
}
120165
function error(e: any) {
121166
reportGlobalError(response, e);
122167
}
123168
reader.read().then(progress).catch(error);
124169
}
125-
126170
function createFromReadableStream<T>(
127171
stream: ReadableStream,
128172
options?: Options,
129173
): Thenable<T> {
130174
const response: FlightResponse = createResponseFromOptions(options);
131-
startReadingFromStream(response, stream);
175+
if (
176+
__DEV__ &&
177+
options &&
178+
options.debugChannel &&
179+
options.debugChannel.readable
180+
) {
181+
startReadingFromUniversalStream(response, options.debugChannel.readable);
182+
startReadingFromStream(response, stream, true);
183+
} else {
184+
startReadingFromStream(response, stream, false);
185+
}
132186
return getRoot(response);
133187
}
134188

@@ -139,7 +193,20 @@ function createFromFetch<T>(
139193
const response: FlightResponse = createResponseFromOptions(options);
140194
promiseForResponse.then(
141195
function (r) {
142-
startReadingFromStream(response, (r.body: any));
196+
if (
197+
__DEV__ &&
198+
options &&
199+
options.debugChannel &&
200+
options.debugChannel.readable
201+
) {
202+
startReadingFromUniversalStream(
203+
response,
204+
options.debugChannel.readable,
205+
);
206+
startReadingFromStream(response, (r.body: any), true);
207+
} else {
208+
startReadingFromStream(response, (r.body: any), false);
209+
}
143210
},
144211
function (e) {
145212
reportGlobalError(response, e);

packages/react-server-dom-esm/src/client/ReactFlightDOMClientNode.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import type {Readable} from 'stream';
1818

1919
import {
2020
createResponse,
21+
createStreamState,
2122
getRoot,
2223
reportGlobalError,
24+
processStringChunk,
2325
processBinaryChunk,
2426
close,
2527
} from 'react-client/src/ReactFlightClient';
@@ -78,8 +80,13 @@ function createFromNodeStream<T>(
7880
? options.environmentName
7981
: undefined,
8082
);
83+
const streamState = createStreamState();
8184
stream.on('data', chunk => {
82-
processBinaryChunk(response, chunk);
85+
if (typeof chunk === 'string') {
86+
processStringChunk(response, streamState, chunk);
87+
} else {
88+
processBinaryChunk(response, streamState, chunk);
89+
}
8390
});
8491
stream.on('error', error => {
8592
reportGlobalError(response, error);

0 commit comments

Comments
 (0)