Skip to content

Commit 3ba0b21

Browse files
fix: only unassign currentMessage after fully processing it
1 parent 826f6a0 commit 3ba0b21

File tree

2 files changed

+75
-8
lines changed

2 files changed

+75
-8
lines changed

src/incoming-message-stream.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,21 @@ class IncomingMessageStream extends Transform {
6464
}
6565

6666
if (packet.isLast()) {
67-
this.currentMessage = undefined;
6867
// Wait until the current message was fully processed before we
6968
// continue processing any remaining messages.
7069
message.once('end', () => {
70+
this.currentMessage = undefined;
7171
this.processBufferedData(callback);
7272
});
7373
message.end(packet.data());
7474
return;
75-
} else {
75+
} else if (!message.write(packet.data())) {
7676
// If too much data is buffering up in the
7777
// current message, wait for it to drain.
78-
if (!message.write(packet.data())) {
79-
message.once('drain', () => {
80-
this.processBufferedData(callback);
81-
});
82-
return;
83-
}
78+
message.once('drain', () => {
79+
this.processBufferedData(callback);
80+
});
81+
return;
8482
}
8583
} else {
8684
break;

test/unit/incoming-message-stream-test.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,73 @@ describe('IncomingMessageStream', function() {
106106
});
107107
});
108108
});
109+
110+
it('correctly handles the last package coming in after the stream was paused', function(done) {
111+
const packetData = Buffer.from('test1234');
112+
const packetHeader = Buffer.alloc(8);
113+
114+
let offset = 0;
115+
offset = packetHeader.writeUInt8(0x11, offset);
116+
offset = packetHeader.writeUInt8(0x00, offset);
117+
offset = packetHeader.writeUInt16BE(8 + packetData.length, offset);
118+
offset = packetHeader.writeUInt16BE(0x0000, offset);
119+
offset = packetHeader.writeUInt8(1, offset);
120+
packetHeader.writeUInt8(0x00, offset);
121+
122+
const firstPacket = Buffer.concat([packetHeader, packetData]);
123+
124+
offset = 0;
125+
offset = packetHeader.writeUInt8(0x11, offset);
126+
offset = packetHeader.writeUInt8(0x01, offset);
127+
offset = packetHeader.writeUInt16BE(8 + packetData.length, offset);
128+
offset = packetHeader.writeUInt16BE(0x0000, offset);
129+
offset = packetHeader.writeUInt8(1, offset);
130+
packetHeader.writeUInt8(0x00, offset);
131+
132+
const secondPacket = Buffer.concat([packetHeader, packetData]);
133+
134+
const incoming = new IncomingMessageStream(new Debug());
135+
136+
const result = new BufferList(function(err, res) {
137+
if (err) {
138+
return done(err);
139+
}
140+
141+
assert.deepEqual(res, Buffer.concat([ packetData, packetData ]));
142+
143+
done();
144+
});
145+
146+
let messageEnded = false;
147+
incoming.on('data', function(message) {
148+
assert.instanceOf(message, Message);
149+
150+
message.on('end', function() {
151+
messageEnded = true;
152+
});
153+
154+
message.pipe(result);
155+
});
156+
157+
incoming.write(firstPacket, function() {
158+
const writtenData = result.slice();
159+
160+
assert.strictEqual(writtenData.length, 8);
161+
assert.deepEqual(writtenData, packetData);
162+
163+
incoming.pause();
164+
165+
incoming.write(secondPacket, function() {
166+
const writtenData = result.slice();
167+
168+
assert.strictEqual(writtenData.length, 16);
169+
assert.deepEqual(writtenData, Buffer.concat([ packetData, packetData ]));
170+
171+
assert.strictEqual(messageEnded, true);
172+
});
173+
174+
assert.isFalse(messageEnded);
175+
incoming.resume();
176+
});
177+
});
109178
});

0 commit comments

Comments
 (0)