Skip to content

Commit 6654c29

Browse files
committed
stream: lazy allocate back pressure buffer
PR-URL: nodejs#50013
1 parent 85c09f1 commit 6654c29

File tree

2 files changed

+42
-21
lines changed

2 files changed

+42
-21
lines changed

lib/internal/http2/core.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2174,7 +2174,7 @@ class Http2Stream extends Duplex {
21742174
process.nextTick(() => {
21752175
if (writeCallbackErr ||
21762176
!this._writableState.ending ||
2177-
this._writableState.buffered.length ||
2177+
this._writableState.buffered?.length ||
21782178
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
21792179
return endCheckCallback();
21802180
debugStreamObj(this, 'shutting down writable on last write');

lib/internal/streams/writable.js

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ const kErroredValue = Symbol('kErroredValue');
7777
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
7878
const kWriteCbValue = Symbol('kWriteCbValue');
7979
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
80+
const kBufferedValue = Symbol('kBufferedValue');
8081

8182
const kObjectMode = 1 << 0;
8283
const kEnded = 1 << 1;
@@ -108,6 +109,7 @@ const kWriteCb = 1 << 26;
108109
const kExpectWriteCb = 1 << 27;
109110
const kAfterWriteTickInfo = 1 << 28;
110111
const kAfterWritePending = 1 << 29;
112+
const kBuffered = 1 << 30;
111113

112114
// TODO(benjamingr) it is likely slower to do it this way than with free functions
113115
function makeBitMapDescriptor(bit) {
@@ -269,6 +271,21 @@ ObjectDefineProperties(WritableState.prototype, {
269271
}
270272
},
271273
},
274+
275+
buffered: {
276+
__proto__: null,
277+
enumerable: false,
278+
get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; },
279+
set(value) {
280+
if (value) {
281+
this[kBufferedValue] = value;
282+
this.state |= kBuffered;
283+
} else {
284+
this.state &= ~kBuffered;
285+
}
286+
},
287+
},
288+
272289
});
273290

274291
function WritableState(options, stream, isDuplex) {
@@ -337,19 +354,20 @@ function WritableState(options, stream, isDuplex) {
337354
}
338355

339356
function resetBuffer(state) {
340-
state.buffered = [];
357+
state[kBufferedValue] = null;
341358
state.bufferedIndex = 0;
342359
state.state |= kAllBuffers | kAllNoop;
360+
state.state &= ~kBuffered;
343361
}
344362

345363
WritableState.prototype.getBuffer = function getBuffer() {
346-
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
364+
return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
347365
};
348366

349367
ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
350368
__proto__: null,
351369
get() {
352-
return this.buffered.length - this.bufferedIndex;
370+
return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex;
353371
},
354372
});
355373

@@ -522,7 +540,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
522540
}
523541

524542
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
525-
state.buffered.push({ chunk, encoding, callback });
543+
if ((state.state & kBuffered) === 0) {
544+
state.state |= kBuffered;
545+
state[kBufferedValue] = [];
546+
}
547+
548+
state[kBufferedValue].push({ chunk, encoding, callback });
526549
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
527550
state.state &= ~kAllBuffers;
528551
}
@@ -607,7 +630,7 @@ function onwrite(stream, er) {
607630
onwriteError(stream, state, er, cb);
608631
}
609632
} else {
610-
if (state.buffered.length > state.bufferedIndex) {
633+
if ((state.state & kBuffered) !== 0) {
611634
clearBuffer(stream, state);
612635
}
613636

@@ -677,11 +700,13 @@ function errorBuffer(state) {
677700
return;
678701
}
679702

680-
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
681-
const { chunk, callback } = state.buffered[n];
682-
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
683-
state.length -= len;
684-
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
703+
if ((state.state & kBuffered) !== 0) {
704+
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
705+
const { chunk, callback } = state[kBufferedValue][n];
706+
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
707+
state.length -= len;
708+
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
709+
}
685710
}
686711

687712

@@ -692,13 +717,12 @@ function errorBuffer(state) {
692717

693718
// If there's something in the buffer waiting, then process it.
694719
function clearBuffer(stream, state) {
695-
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
696-
(state.state & kConstructed) === 0) {
720+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) {
697721
return;
698722
}
699723

700724
const objectMode = (state.state & kObjectMode) !== 0;
701-
const { buffered, bufferedIndex } = state;
725+
const { [kBufferedValue]: buffered, bufferedIndex } = state;
702726
const bufferedLength = buffered.length - bufferedIndex;
703727

704728
if (!bufferedLength) {
@@ -828,10 +852,9 @@ function needFinish(state) {
828852
kWriting |
829853
kErrorEmitted |
830854
kCloseEmitted |
831-
kErrored
832-
)) === (kEnding | kConstructed) &&
833-
state.length === 0 &&
834-
state.buffered.length === 0);
855+
kErrored |
856+
kBuffered
857+
)) === (kEnding | kConstructed) && state.length === 0);
835858
}
836859

837860
function callFinal(stream, state) {
@@ -1073,9 +1096,7 @@ Writable.prototype.destroy = function(err, cb) {
10731096
const state = this._writableState;
10741097

10751098
// Invoke pending callbacks.
1076-
if ((state.state & kDestroyed) === 0 &&
1077-
(state.bufferedIndex < state.buffered.length ||
1078-
(state.state & kOnFinished) !== 0)) {
1099+
if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) {
10791100
process.nextTick(errorBuffer, state);
10801101
}
10811102

0 commit comments

Comments
 (0)