Skip to content

Commit e101b1d

Browse files
committed
fixup
1 parent dccc0e7 commit e101b1d

File tree

3 files changed

+223
-55
lines changed

3 files changed

+223
-55
lines changed

lib/internal/streams/destroy.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ function destroyer(stream, err) {
356356
}
357357

358358
module.exports = {
359+
kConstruct,
360+
kDestroy,
359361
construct,
360362
destroyer,
361363
destroy,

lib/internal/streams/readable.js

Lines changed: 216 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const {
3535
SymbolAsyncIterator,
3636
Symbol,
3737
TypeError,
38+
Uint8Array,
3839
} = primordials;
3940

4041
module.exports = Readable;
@@ -45,6 +46,8 @@ const { Stream, prependListener } = require('internal/streams/legacy');
4546
const { Buffer } = require('buffer');
4647

4748
let Blob;
49+
let ReadableStream;
50+
let CountQueuingStrategy;
4851

4952
const {
5053
addAbortSignal,
@@ -75,9 +78,11 @@ const { validateObject } = require('internal/validators');
7578

7679
const kPaused = Symbol('kPaused');
7780
const kConsume = Symbol('kConsume');
81+
const kReading = Symbol('kReading');
7882

7983
const { StringDecoder } = require('string_decoder');
8084
const from = require('internal/streams/from');
85+
const assert = require('internal/assert');
8186

8287
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
8388
ObjectSetPrototypeOf(Readable, Stream);
@@ -213,6 +218,7 @@ function Readable(options) {
213218
}
214219

215220
this[kConsume] = null;
221+
this[kReading] = false; // Is stream being consumed through Readable API?
216222

217223
Stream.call(this, options);
218224

@@ -238,6 +244,11 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) {
238244
// similar to how Writable.write() returns true if you should
239245
// write() some more.
240246
Readable.prototype.push = function(chunk, encoding) {
247+
if (this[kConsume] && chunk !== null && !this[kReading]) {
248+
encoding = encoding || this._readableState.defaultEncoding;
249+
return this[kConsume].push(chunk, encoding);
250+
}
251+
241252
return readableAddChunk(this, chunk, encoding, false);
242253
};
243254

@@ -307,10 +318,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
307318
maybeReadMore(stream, state);
308319
}
309320

321+
const consumed = this[kConsume] ? this[kConsume].push(chunk, encoding) : true;
322+
310323
// We can push more data if we are below the highWaterMark.
311324
// Also, if we have no data yet, we can stand some more bytes.
312325
// This is to work around cases where hwm=0, such as the repl.
313-
return !state.ended &&
326+
return consumed && !state.ended &&
314327
(state.length < state.highWaterMark || state.length === 0);
315328
}
316329

@@ -402,6 +415,27 @@ function howMuchToRead(n, state) {
402415
return state.ended ? state.length : 0;
403416
}
404417

418+
419+
function _read(self, n) {
420+
// Call internal read method
421+
try {
422+
const result = self._read(n);
423+
if (result != null) {
424+
const then = result.then;
425+
if (typeof then === 'function') {
426+
then.call(
427+
result,
428+
nop,
429+
function(err) {
430+
errorOrDestroy(self, err);
431+
});
432+
}
433+
}
434+
} catch (err) {
435+
errorOrDestroy(self, err);
436+
}
437+
}
438+
405439
// You can override either this method, or the async _read(n) below.
406440
Readable.prototype.read = function(n) {
407441
debug('read', n);
@@ -496,22 +530,7 @@ Readable.prototype.read = function(n) {
496530
state.needReadable = true;
497531

498532
// Call internal read method
499-
try {
500-
const result = this._read(state.highWaterMark);
501-
if (result != null) {
502-
const then = result.then;
503-
if (typeof then === 'function') {
504-
then.call(
505-
result,
506-
nop,
507-
function(err) {
508-
errorOrDestroy(this, err);
509-
});
510-
}
511-
}
512-
} catch (err) {
513-
errorOrDestroy(this, err);
514-
}
533+
_read(this, state.highWaterMark);
515534

516535
state.sync = false;
517536
// If _read pushed data synchronously, then `reading` will be false,
@@ -906,6 +925,8 @@ Readable.prototype.on = function(ev, fn) {
906925
const state = this._readableState;
907926

908927
if (ev === 'data') {
928+
this[kReading] = true;
929+
909930
// Update readableListening so that resume() may be a no-op
910931
// a few lines down. This is needed to support once('readable').
911932
state.readableListening = this.listenerCount('readable') > 0;
@@ -914,6 +935,8 @@ Readable.prototype.on = function(ev, fn) {
914935
if (state.flowing !== false)
915936
this.resume();
916937
} else if (ev === 'readable') {
938+
this[kReading] = true;
939+
917940
if (!state.endEmitted && !state.readableListening) {
918941
state.readableListening = state.needReadable = true;
919942
state.flowing = false;
@@ -1310,7 +1333,7 @@ ObjectDefineProperties(ReadableState.prototype, {
13101333
body: {
13111334
get() {
13121335
if (this[kConsume]?.type === kWebStreamType) {
1313-
return this[kConsume].body;
1336+
return this[kConsume].stream;
13141337
}
13151338

13161339
return consume(this, kWebStreamType);
@@ -1343,8 +1366,7 @@ ObjectDefineProperties(ReadableState.prototype, {
13431366
});
13441367

13451368
function isLocked(self) {
1346-
return self[kConsume] &&
1347-
(self[kConsume].type !== kWebStreamType || self[kConsume].body.locked);
1369+
return self[kConsume]?.stream?.locked === true;
13481370
}
13491371

13501372
// https://streams.spec.whatwg.org/#readablestream-disturbed
@@ -1363,56 +1385,194 @@ function consume(self, type) {
13631385
}
13641386

13651387
if (type === kWebStreamType) {
1366-
self[kConsume] = {
1388+
if (!ReadableStream) {
1389+
ReadableStream = require('internal/webstreams/readablestream')
1390+
.ReadableStream;
1391+
}
1392+
1393+
const objectMode = self.readableObjectMode;
1394+
const highWaterMark = self.readableHighWaterMark;
1395+
// When not running in objectMode explicitly, we just fall
1396+
// back to a minimal strategy that just specifies the highWaterMark
1397+
// and no size algorithm. Using a ByteLengthQueuingStrategy here
1398+
// is unnecessary.
1399+
let strategy;
1400+
if (objectMode) {
1401+
if (!CountQueuingStrategy) {
1402+
CountQueuingStrategy = require('internal/webstreams/queuingstrategies');
1403+
}
1404+
strategy = new CountQueuingStrategy({ highWaterMark });
1405+
} else {
1406+
strategy = { highWaterMark };
1407+
}
1408+
1409+
1410+
self
1411+
.on('error', function(err) {
1412+
const { controller } = this[kConsume];
1413+
controller.error(err);
1414+
})
1415+
.on('close', function() {
1416+
const { controller } = this[kConsume];
1417+
if (controller) {
1418+
controller.error(new AbortError());
1419+
}
1420+
});
1421+
1422+
const consume = self[kConsume] = {
13671423
type,
1368-
body: Readable.toWeb(self)
1424+
objectMode,
1425+
controller: null,
1426+
push(chunk) {
1427+
const { objectMode, controller } = this;
1428+
1429+
assert(controller);
1430+
1431+
if (chunk === null) {
1432+
controller.close();
1433+
this.controller = null;
1434+
} else {
1435+
if (!objectMode) {
1436+
if (typeof chunk === 'string') {
1437+
chunk = new Uint8Array(Buffer.from(chunk));
1438+
} else if (Buffer.isBuffer(chunk)) {
1439+
// Copy the Buffer to detach it from the pool.
1440+
chunk = new Uint8Array(chunk);
1441+
} else if (Stream._isUint8Array(chunk)) {
1442+
// Do nothing...
1443+
} else if (chunk != null) {
1444+
throw new ERR_INVALID_ARG_TYPE(
1445+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
1446+
}
1447+
}
1448+
1449+
// TODO: Does controller perform any type checks?
1450+
controller.enqueue(chunk);
1451+
}
1452+
1453+
return controller.desiredSize > 0;
1454+
},
1455+
stream: new ReadableStream({
1456+
async start(controller) {
1457+
consume.controller = controller;
1458+
1459+
const { _readableState: state } = self;
1460+
1461+
if (self[kReading]) {
1462+
while (controller.desiredSize > 0) {
1463+
const chunk = self.read();
1464+
if (chunk === null) {
1465+
break;
1466+
}
1467+
controller.enqueue(chunk);
1468+
}
1469+
} else {
1470+
const buffer = state.buffer;
1471+
while (buffer.length) {
1472+
controller.enqueue(buffer.shift());
1473+
}
1474+
state.lenth = 0;
1475+
}
1476+
1477+
if (state.ended) {
1478+
controller.close();
1479+
}
1480+
1481+
if (!state.constructed) {
1482+
await EE.once(destroyImpl.kConstruct, self);
1483+
}
1484+
},
1485+
pull() {
1486+
const { _readableState: state } = self;
1487+
1488+
const n = consume.controller.desiredSize;
1489+
1490+
if (self[kReading]) {
1491+
assert(state.length === 0);
1492+
self.read(n);
1493+
} else {
1494+
_read(self, n);
1495+
}
1496+
},
1497+
cancel(reason) {
1498+
self.destroy(reason);
1499+
},
1500+
}, strategy)
13691501
};
13701502

1371-
return self[kConsume].body;
1503+
return consume.stream;
13721504
}
13731505

13741506
return new Promise((resolve, reject) => {
13751507
self[kConsume] = {
13761508
type,
13771509
resolve,
13781510
reject,
1379-
body: type === kTextType || type === kJSONType ? '' : []
1380-
};
1381-
self
1382-
.on('error', reject)
1383-
.on('data', function(val) {
1384-
const { type } = this[kConsume];
1511+
decoder: null,
1512+
body: type === kTextType || type === kJSONType ? '' : [],
1513+
push(chunk, encoding) {
1514+
const { type, body, resolve, decoder } = this[kConsume];
1515+
1516+
if (chunk === null) {
1517+
try {
1518+
if (type === kTextType) {
1519+
resolve(body + (decoder ? decoder.end() : ''));
1520+
} else if (type === kJSONType) {
1521+
resolve(JSONParse(body + (decoder ? decoder.end() : '')));
1522+
} else if (type === kArrayBufferType) {
1523+
resolve(Buffer.concat(body).buffer);
1524+
} else if (type === kBlobType) {
1525+
if (!Blob) {
1526+
Blob = require('buffer').Blob;
1527+
}
1528+
resolve(new Blob(body));
1529+
}
13851530

1386-
// TODO (fix): Do we need type check and/or conversion?
1531+
this[kConsume].body = null;
1532+
} catch (err) {
1533+
self.destroy(err);
1534+
}
1535+
} else if (type === kTextType || type === kJSONType) {
1536+
if (typeof chunk === 'string') {
1537+
if (decoder) {
1538+
chunk = decoder.write(Buffer.from(chunk));
1539+
}
1540+
// TODO: Encoding check/transform?
1541+
} else if (chunk instanceof Buffer) {
1542+
if (!decoder) {
1543+
this[kConsume].decoder = new StringDecoder('utf8');
1544+
}
1545+
encoding = decoder.write(chunk);
1546+
} else if (Stream._isUint8Array(chunk)) {
1547+
encoding = decoder.write(Stream._uint8ArrayToBuffer(chunk));
1548+
} else {
1549+
throw new ERR_INVALID_ARG_TYPE(
1550+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
1551+
}
13871552

1388-
if (type === kTextType || type === kJSONType) {
1389-
this[kConsume].body += val;
1553+
this[kConsume].body += chunk;
13901554
} else {
1391-
this[kConsume].body.push(val);
1392-
}
1393-
})
1394-
.on('end', function() {
1395-
const { type, resolve, body } = this[kConsume];
1396-
1397-
try {
1398-
if (type === kTextType) {
1399-
resolve(body);
1400-
} else if (type === kJSONType) {
1401-
resolve(JSONParse(body));
1402-
} else if (type === kArrayBufferType) {
1403-
resolve(Buffer.concat(body).buffer);
1404-
} else if (type === kBlobType) {
1405-
if (!Blob) {
1406-
Blob = require('buffer').Blob;
1407-
}
1408-
resolve(new Blob(body));
1555+
if (typeof chunk === 'string') {
1556+
chunk = Buffer.from(chunk);
1557+
// TODO: Encoding check/transform?
1558+
} else if (chunk instanceof Buffer) {
1559+
// Do nothing...
1560+
} else if (Stream._isUint8Array(chunk)) {
1561+
chunk = Stream._uint8ArrayToBuffer(chunk);
1562+
} else {
1563+
throw new ERR_INVALID_ARG_TYPE(
1564+
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
14091565
}
14101566

1411-
this[kConsume].body = null;
1412-
} catch (err) {
1413-
self.destroy(err);
1567+
this[kConsume].body.push(chunk);
14141568
}
1415-
})
1569+
1570+
return true;
1571+
}
1572+
};
1573+
1574+
self
1575+
.on('error', reject)
14161576
.on('close', function() {
14171577
const { body, reject } = this[kConsume];
14181578

@@ -1522,5 +1682,6 @@ Readable.fromWeb = function(readableStream, options) {
15221682
};
15231683

15241684
Readable.toWeb = function(streamReadable) {
1525-
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
1685+
return streamReadable[kConsume] !== undefined ? streamReadable.stream :
1686+
lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
15261687
};

0 commit comments

Comments
 (0)