Skip to content

Commit e13d9ef

Browse files
committed
stream: manual destroy IncomingRequest on pipeline
1 parent 32f7218 commit e13d9ef

File tree

4 files changed

+132
-1
lines changed

4 files changed

+132
-1
lines changed

lib/_http_server.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ const onResponseFinishChannel = dc.channel('http.server.response.finish');
9292
const kServerResponse = Symbol('ServerResponse');
9393
const kServerResponseStatistics = Symbol('ServerResponseStatistics');
9494

95+
const {
96+
kManualDestroy,
97+
kPipelineStream
98+
} = require('internal/streams/utils');
99+
95100
const {
96101
hasObserver,
97102
} = require('internal/perf/observe');
@@ -234,6 +239,10 @@ function onServerResponseClose() {
234239
}
235240
}
236241

242+
ServerResponse.prototype[kPipelineStream] = function() {
243+
this[kManualDestroy] = true;
244+
};
245+
237246
ServerResponse.prototype.assignSocket = function assignSocket(socket) {
238247
assert(!socket._httpMessage);
239248
socket._httpMessage = this;

lib/internal/streams/pipeline.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ const {
3333
isIterable,
3434
isReadableNodeStream,
3535
isNodeStream,
36+
kManualDestroy,
37+
kPipelineStream,
3638
} = require('internal/streams/utils');
3739
const { AbortController } = require('internal/abort_controller');
3840

@@ -52,7 +54,9 @@ function destroyer(stream, reading, writing) {
5254
return (err) => {
5355
if (finished) return;
5456
finished = true;
55-
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
57+
if (stream[kManualDestroy] !== true) {
58+
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
59+
}
5660
};
5761
}
5862

@@ -205,6 +209,10 @@ function pipelineImpl(streams, callback, opts) {
205209
const writing = i > 0;
206210
const end = reading || opts?.end !== false;
207211

212+
if (stream[kPipelineStream]) {
213+
stream[kPipelineStream]();
214+
}
215+
208216
if (isNodeStream(stream)) {
209217
if (end) {
210218
destroys.push(destroyer(stream, reading, writing));

lib/internal/streams/utils.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ const {
88

99
const kDestroyed = Symbol('kDestroyed');
1010
const kIsDisturbed = Symbol('kIsDisturbed');
11+
const kManualDestroy = Symbol('kManualDestroy');
12+
const kPipelineStream = Symbol('kPipelineStream');
1113

1214
function isReadableNodeStream(obj, strict = false) {
1315
return !!(
@@ -247,6 +249,8 @@ function isDisturbed(stream) {
247249

248250
module.exports = {
249251
kDestroyed,
252+
kManualDestroy,
253+
kPipelineStream,
250254
isDisturbed,
251255
kIsDisturbed,
252256
isClosed,

test/parallel/test-stream-pipeline.js

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const http = require('http');
1717
const { promisify } = require('util');
1818
const net = require('net');
1919
const tsp = require('timers/promises');
20+
const fs = require('fs');
2021

2122
{
2223
let finished = false;
@@ -1512,3 +1513,112 @@ const tsp = require('timers/promises');
15121513
assert.strictEqual(s.destroyed, true);
15131514
}));
15141515
}
1516+
1517+
{
1518+
const r = new Readable({
1519+
read() {}
1520+
});
1521+
r.push('hello');
1522+
r.push('world');
1523+
r.push(null);
1524+
let res = '';
1525+
const w = new Writable({
1526+
write(chunk, encoding, callback) {
1527+
res += chunk;
1528+
callback();
1529+
}
1530+
});
1531+
pipeline([r, w], common.mustCall((err) => {
1532+
assert.ok(r.destroyed)
1533+
assert.ok(w.destroyed)
1534+
assert.ok(!err);
1535+
assert.strictEqual(res, 'helloworld');
1536+
}));
1537+
}
1538+
1539+
{
1540+
const r = new Readable({
1541+
read() {}
1542+
});
1543+
r.push('hello');
1544+
r.push('world');
1545+
r.push(null);
1546+
let res = '';
1547+
const w = new Writable({
1548+
write(chunk, encoding, callback) {
1549+
res += chunk;
1550+
callback();
1551+
}
1552+
});
1553+
pipeline([r, w], common.mustCall((err) => {
1554+
assert.ok(r.destroyed)
1555+
assert.ok(w.destroyed)
1556+
assert.ok(!err);
1557+
assert.strictEqual(res, 'helloworld');
1558+
}));
1559+
}
1560+
1561+
// When occurs an error in the pipeline the IncomingRequest should not destroy the connection automatically.
1562+
{
1563+
const server = http.createServer(common.mustCall((req, res) => {
1564+
const r = fs.createReadStream('./notfound');
1565+
pipeline(r, res, common.mustCall((err) => {
1566+
assert.ok(res.destroyed === false);
1567+
assert.ok(r.destroyed);
1568+
assert.strictEqual(err.code, 'ENOENT');
1569+
assert.strictEqual(err.message,
1570+
'ENOENT: no such file or directory, ' +
1571+
'open \'./notfound\'');
1572+
res.end(err.message);
1573+
}));
1574+
}));
1575+
1576+
server.listen(0, common.mustCall(() => {
1577+
http.request({
1578+
port: server.address().port
1579+
}, common.mustCall((res) => {
1580+
res.setEncoding('utf8');
1581+
let responseData = '';
1582+
res.on('data', (chunk) => { responseData += chunk; });
1583+
res.on('end', common.mustCall(() => {
1584+
assert.strictEqual(responseData,
1585+
'ENOENT: no such file or directory, ' +
1586+
'open \'./notfound\'');
1587+
setImmediate(() => {
1588+
res.destroy();
1589+
server.close();
1590+
});
1591+
}));
1592+
})).end();
1593+
}));
1594+
}
1595+
1596+
// Should close the IncomingRequest stream automatically when no error occurs
1597+
{
1598+
const server = http.createServer(common.mustCall((req, res) => {
1599+
const r = fs.createReadStream(__filename);
1600+
pipeline(r, res, common.mustCall((err) => {
1601+
assert.ok(res.destroyed);
1602+
assert.ok(r.destroyed);
1603+
assert.ok(err === undefined)
1604+
}));
1605+
}));
1606+
1607+
server.listen(0, common.mustCall(() => {
1608+
http.request({
1609+
port: server.address().port
1610+
}, common.mustCall((res) => {
1611+
res.setEncoding('utf8');
1612+
let responseData = '';
1613+
res.on('data', (chunk) => { responseData += chunk; });
1614+
res.on('end', common.mustCall(() => {
1615+
const data = fs.readFileSync(__filename)
1616+
assert.strictEqual(responseData, data.toString());
1617+
setImmediate(() => {
1618+
res.destroy();
1619+
server.close();
1620+
});
1621+
}));
1622+
})).end();
1623+
}));
1624+
}

0 commit comments

Comments
 (0)