Skip to content

Commit d2ac192

Browse files
authored
stream: add reduce
PR-URL: #41669 Reviewed-By: Matteo Collina <[email protected]>
1 parent 0018ee1 commit d2ac192

File tree

5 files changed

+246
-16
lines changed

5 files changed

+246
-16
lines changed

doc/api/stream.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2142,6 +2142,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
21422142
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
21432143
```
21442144

2145+
### `readable.reduce(fn[, initial[, options]])`
2146+
2147+
<!-- YAML
2148+
added: REPLACEME
2149+
-->
2150+
2151+
> Stability: 1 - Experimental
2152+
2153+
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
2154+
in the stream.
2155+
* `previous` {any} the value obtained from the last call to `fn` or the
2156+
`initial` value if specified or the first chunk of the stream otherwise.
2157+
* `data` {any} a chunk of data from the stream.
2158+
* `options` {Object}
2159+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2160+
abort the `fn` call early.
2161+
* `initial` {any} the initial value to use in the reduction.
2162+
* `options` {Object}
2163+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2164+
aborted.
2165+
* Returns: {Promise} a promise for the final value of the reduction.
2166+
2167+
This method calls `fn` on each chunk of the stream in order, passing it the
2168+
result from the calculation on the previous element. It returns a promise for
2169+
the final value of the reduction.
2170+
2171+
The reducer function iterates the stream element-by-element which means that
2172+
there is no `concurrency` parameter or parallism. To perform a `reduce`
2173+
concurrently, it can be chained to the [`readable.map`][] method.
2174+
2175+
If no `initial` value is supplied the first chunk of the stream is used as the
2176+
initial value. If the stream is empty, the promise is rejected with a
2177+
`TypeError` with the `ERR_INVALID_ARGS` code property.
2178+
2179+
```mjs
2180+
import { Readable } from 'stream';
2181+
2182+
const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
2183+
return previous + data;
2184+
});
2185+
console.log(ten); // 10
2186+
```
2187+
21452188
### Duplex and transform streams
21462189

21472190
#### Class: `stream.Duplex`
@@ -4217,6 +4260,7 @@ contain multi-byte characters.
42174260
[`process.stdin`]: process.md#processstdin
42184261
[`process.stdout`]: process.md#processstdout
42194262
[`readable._read()`]: #readable_readsize
4263+
[`readable.map`]: #readablemapfn-options
42204264
[`readable.push('')`]: #readablepush
42214265
[`readable.setEncoding()`]: #readablesetencodingencoding
42224266
[`stream.Readable.from()`]: #streamreadablefromiterable-options

lib/internal/streams/end-of-stream.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const {
1717
validateObject,
1818
} = require('internal/validators');
1919

20+
const { Promise } = primordials;
21+
2022
const {
2123
isClosed,
2224
isReadable,
@@ -236,4 +238,17 @@ function eos(stream, options, callback) {
236238
return cleanup;
237239
}
238240

241+
function finished(stream, opts) {
242+
return new Promise((resolve, reject) => {
243+
eos(stream, opts, (err) => {
244+
if (err) {
245+
reject(err);
246+
} else {
247+
resolve();
248+
}
249+
});
250+
});
251+
}
252+
239253
module.exports = eos;
254+
module.exports.finished = finished;

lib/internal/streams/operators.js

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ const { AbortController } = require('internal/abort_controller');
55
const {
66
codes: {
77
ERR_INVALID_ARG_TYPE,
8+
ERR_MISSING_ARGS,
89
ERR_OUT_OF_RANGE,
910
},
1011
AbortError,
1112
} = require('internal/errors');
1213
const { validateInteger } = require('internal/validators');
1314
const { kWeakHandler } = require('internal/event_target');
15+
const { finished } = require('internal/streams/end-of-stream');
1416

1517
const {
1618
ArrayPrototypePush,
@@ -198,8 +200,8 @@ async function every(fn, options) {
198200
'fn', ['Function', 'AsyncFunction'], fn);
199201
}
200202
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
201-
return !(await some.call(this, async (x) => {
202-
return !(await fn(x));
203+
return !(await some.call(this, async (...args) => {
204+
return !(await fn(...args));
203205
}, options));
204206
}
205207

@@ -230,11 +232,61 @@ function filter(fn, options) {
230232
return this.map(filterFn, options);
231233
}
232234

235+
// Specific to provide better error to reduce since the argument is only
236+
// missing if the stream has no items in it - but the code is still appropriate
237+
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
238+
constructor() {
239+
super('reduce');
240+
this.message = 'Reduce of an empty stream requires an initial value';
241+
}
242+
}
243+
244+
async function reduce(reducer, initialValue, options) {
245+
if (typeof reducer !== 'function') {
246+
throw new ERR_INVALID_ARG_TYPE(
247+
'reducer', ['Function', 'AsyncFunction'], reducer);
248+
}
249+
let hasInitialValue = arguments.length > 1;
250+
if (options?.signal?.aborted) {
251+
const err = new AbortError(undefined, { cause: options.signal.reason });
252+
this.once('error', () => {}); // The error is already propagated
253+
await finished(this.destroy(err));
254+
throw err;
255+
}
256+
const ac = new AbortController();
257+
const signal = ac.signal;
258+
if (options?.signal) {
259+
const opts = { once: true, [kWeakHandler]: this };
260+
options.signal.addEventListener('abort', () => ac.abort(), opts);
261+
}
262+
let gotAnyItemFromStream = false;
263+
try {
264+
for await (const value of this) {
265+
gotAnyItemFromStream = true;
266+
if (options?.signal?.aborted) {
267+
throw new AbortError();
268+
}
269+
if (!hasInitialValue) {
270+
initialValue = value;
271+
hasInitialValue = true;
272+
} else {
273+
initialValue = await reducer(initialValue, value, { signal });
274+
}
275+
}
276+
if (!gotAnyItemFromStream && !hasInitialValue) {
277+
throw new ReduceAwareErrMissingArgs();
278+
}
279+
} finally {
280+
ac.abort();
281+
}
282+
return initialValue;
283+
}
284+
233285
async function toArray(options) {
234286
const result = [];
235287
for await (const val of this) {
236288
if (options?.signal?.aborted) {
237-
throw new AbortError({ cause: options.signal.reason });
289+
throw new AbortError(undefined, { cause: options.signal.reason });
238290
}
239291
ArrayPrototypePush(result, val);
240292
}
@@ -312,6 +364,7 @@ module.exports.streamReturningOperators = {
312364
module.exports.promiseReturningOperators = {
313365
every,
314366
forEach,
367+
reduce,
315368
toArray,
316369
some,
317370
};

lib/stream/promises.js

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const {
1111
} = require('internal/streams/utils');
1212

1313
const { pipelineImpl: pl } = require('internal/streams/pipeline');
14-
const eos = require('internal/streams/end-of-stream');
14+
const { finished } = require('internal/streams/end-of-stream');
1515

1616
function pipeline(...streams) {
1717
return new Promise((resolve, reject) => {
@@ -35,18 +35,6 @@ function pipeline(...streams) {
3535
});
3636
}
3737

38-
function finished(stream, opts) {
39-
return new Promise((resolve, reject) => {
40-
eos(stream, opts, (err) => {
41-
if (err) {
42-
reject(err);
43-
} else {
44-
resolve();
45-
}
46-
});
47-
});
48-
}
49-
5038
module.exports = {
5139
finished,
5240
pipeline,
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
function sum(p, c) {
10+
return p + c;
11+
}
12+
13+
{
14+
// Does the same thing as `(await stream.toArray()).reduce(...)`
15+
(async () => {
16+
const tests = [
17+
[[], sum, 0],
18+
[[1], sum, 0],
19+
[[1, 2, 3, 4, 5], sum, 0],
20+
[[...Array(100).keys()], sum, 0],
21+
[['a', 'b', 'c'], sum, ''],
22+
[[1, 2], sum],
23+
[[1, 2, 3], (x, y) => y],
24+
];
25+
for (const [values, fn, initial] of tests) {
26+
const streamReduce = await Readable.from(values)
27+
.reduce(fn, initial);
28+
const arrayReduce = values.reduce(fn, initial);
29+
assert.deepStrictEqual(streamReduce, arrayReduce);
30+
}
31+
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
32+
// asynchronous reducer
33+
for (const [values, fn, initial] of tests) {
34+
const streamReduce = await Readable.from(values)
35+
.map(async (x) => x)
36+
.reduce(fn, initial);
37+
const arrayReduce = values.reduce(fn, initial);
38+
assert.deepStrictEqual(streamReduce, arrayReduce);
39+
}
40+
})().then(common.mustCall());
41+
}
42+
{
43+
// Works with an async reducer, with or without initial value
44+
(async () => {
45+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
46+
assert.strictEqual(six, 6);
47+
})().then(common.mustCall());
48+
(async () => {
49+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
50+
assert.strictEqual(six, 6);
51+
})().then(common.mustCall());
52+
}
53+
{
54+
// Works lazily
55+
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
56+
.map(common.mustCall((x) => {
57+
return x;
58+
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
59+
.reduce(async (p, c) => {
60+
if (p === 1) {
61+
throw new Error('boom');
62+
}
63+
return c;
64+
}, 0)
65+
, /boom/).then(common.mustCall());
66+
}
67+
68+
{
69+
// Support for AbortSignal
70+
const ac = new AbortController();
71+
assert.rejects(async () => {
72+
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
73+
if (c === 3) {
74+
await new Promise(() => {}); // Explicitly do not pass signal here
75+
}
76+
return Promise.resolve();
77+
}, 0, { signal: ac.signal });
78+
}, {
79+
name: 'AbortError',
80+
}).then(common.mustCall());
81+
ac.abort();
82+
}
83+
84+
85+
{
86+
// Support for AbortSignal - pre aborted
87+
const stream = Readable.from([1, 2, 3]);
88+
assert.rejects(async () => {
89+
await stream.reduce(async (p, c) => {
90+
if (c === 3) {
91+
await new Promise(() => {}); // Explicitly do not pass signal here
92+
}
93+
return Promise.resolve();
94+
}, 0, { signal: AbortSignal.abort() });
95+
}, {
96+
name: 'AbortError',
97+
}).then(common.mustCall(() => {
98+
assert.strictEqual(stream.destroyed, true);
99+
}));
100+
}
101+
102+
{
103+
// Support for AbortSignal - deep
104+
const stream = Readable.from([1, 2, 3]);
105+
assert.rejects(async () => {
106+
await stream.reduce(async (p, c, { signal }) => {
107+
signal.addEventListener('abort', common.mustCall(), { once: true });
108+
if (c === 3) {
109+
await new Promise(() => {}); // Explicitly do not pass signal here
110+
}
111+
return Promise.resolve();
112+
}, 0, { signal: AbortSignal.abort() });
113+
}, {
114+
name: 'AbortError',
115+
}).then(common.mustCall(() => {
116+
assert.strictEqual(stream.destroyed, true);
117+
}));
118+
}
119+
120+
{
121+
// Error cases
122+
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123+
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
124+
}
125+
126+
{
127+
// Test result is a Promise
128+
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
129+
assert.ok(result instanceof Promise);
130+
}

0 commit comments

Comments
 (0)