From 899cf825096db10a680a73933efa256888b86b59 Mon Sep 17 00:00:00 2001 From: Anna Gringauze Date: Fri, 9 Sep 2022 17:08:34 -0700 Subject: [PATCH 1/5] Batch expression evaluations from the same library --- .../src/services/chrome_proxy_service.dart | 3 +- .../src/services/expression_evaluator.dart | 117 ++++++++++++++++++ dwds/test/evaluate_common.dart | 19 +++ .../expression_compiler_service_test.dart | 77 ++++++++++++ 4 files changed, 215 insertions(+), 1 deletion(-) diff --git a/dwds/lib/src/services/chrome_proxy_service.dart b/dwds/lib/src/services/chrome_proxy_service.dart index 2d3a89e1f..5bb93b1a3 100644 --- a/dwds/lib/src/services/chrome_proxy_service.dart +++ b/dwds/lib/src/services/chrome_proxy_service.dart @@ -241,7 +241,7 @@ class ChromeProxyService implements VmServiceInterface { final compiler = _compiler; _expressionEvaluator = compiler == null ? null - : ExpressionEvaluator( + : BatchedExpressionEvaluator( entrypoint, inspector, debugger, @@ -318,6 +318,7 @@ class ChromeProxyService implements VmServiceInterface { _inspector = null; _previousBreakpoints.clear(); _previousBreakpoints.addAll(isolate.breakpoints ?? []); + _expressionEvaluator?.close(); _consoleSubscription?.cancel(); _consoleSubscription = null; } diff --git a/dwds/lib/src/services/expression_evaluator.dart b/dwds/lib/src/services/expression_evaluator.dart index 6ae50ff16..0dc5045cc 100644 --- a/dwds/lib/src/services/expression_evaluator.dart +++ b/dwds/lib/src/services/expression_evaluator.dart @@ -2,6 +2,9 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + +import 'package:collection/collection.dart'; import 'package:dwds/src/utilities/domain.dart'; import 'package:logging/logging.dart'; import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart'; @@ -12,6 +15,7 @@ import '../debugging/location.dart'; import '../debugging/modules.dart'; import '../loaders/strategy.dart'; import '../utilities/objects.dart' as chrome; +import '../utilities/batched_stream.dart'; import 'expression_compiler.dart'; class ErrorKind { @@ -29,6 +33,117 @@ class ErrorKind { String toString() => _kind; } +class EvaluateRequest { + final String isolateId; + final String? libraryUri; + final String expression; + final Map? scope; + final completer = Completer(); + + EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope); +} + +class BatchedExpressionEvaluator extends ExpressionEvaluator { + final _requestController = + BatchedStreamController(delay: 200); + + BatchedExpressionEvaluator( + String entrypoint, + AppInspectorInterface inspector, + Debugger debugger, + Locations locations, + Modules modules, + ExpressionCompiler compiler, + ) : super(entrypoint, inspector, debugger, locations, modules, compiler) { + _requestController.stream.listen(_processRequest); + } + + @override + void close() => _requestController.close(); + + @override + Future evaluateExpression( + String isolateId, + String? libraryUri, + String expression, + Map? scope, + ) { + final request = EvaluateRequest(isolateId, libraryUri, expression, scope); + _requestController.sink.add(request); + return request.completer.future; + } + + void _processRequest(List requests) async { + String? libraryUri; + String? isolateId; + Map? scope; + List currentRequests = []; + + for (var request in requests) { + libraryUri ??= request.libraryUri; + isolateId ??= request.isolateId; + scope ??= request.scope; + + if (libraryUri != request.libraryUri || + isolateId != request.isolateId || + !MapEquality().equals(scope, request.scope)) { + _logger.fine('New batch due to'); + if (libraryUri != request.libraryUri) { + _logger.fine(' - library uri: $libraryUri != ${request.libraryUri}'); + } + if (isolateId != request.isolateId) { + _logger.fine(' - isolateId: $isolateId != ${request.isolateId}'); + } + if (!MapEquality().equals(scope, request.scope)) { + _logger.fine(' - scope: $scope != ${request.scope}'); + } + + _evaluateBatch(currentRequests); + currentRequests = []; + libraryUri = request.libraryUri; + isolateId = request.isolateId; + scope = request.scope; + } + currentRequests.add(request); + } + _evaluateBatch(currentRequests); + } + + void _evaluateBatch(List requests) async { + if (requests.isEmpty) return; + + final first = requests.first; + if (requests.length == 1) { + super + .evaluateExpression( + first.isolateId, first.libraryUri, first.expression, first.scope) + .then(requests.first.completer.complete); + } + + final expressions = requests.map((r) => r.expression).join(', '); + final batchedExpression = '[ $expressions ]'; + + _logger.fine('Evaluating batch of expressions $batchedExpression'); + + final RemoteObject list = await super.evaluateExpression( + first.isolateId, first.libraryUri, batchedExpression, first.scope); + + for (var i = 0; i < requests.length; i++) { + final request = requests[i]; + _logger.fine('Getting result out of a batch for ${request.expression}'); + _debugger + .getProperties(list.objectId!, + offset: i, count: 1, length: requests.length) + .then((v) { + final result = v.first.value; + _logger.fine( + 'Got result out of a batch for ${request.expression}: $result'); + request.completer.complete(result); + }); + } + } +} + /// ExpressionEvaluator provides functionality to evaluate dart expressions /// from text user input in the debugger, using chrome remote debugger to /// collect context for evaluation (scope, types, modules), and using @@ -61,6 +176,8 @@ class ExpressionEvaluator { {'type': '$severity', 'value': message}); } + void close() {} + /// Evaluate dart expression inside a given library. /// /// Uses ExpressionCompiler interface to compile the expression to diff --git a/dwds/test/evaluate_common.dart b/dwds/test/evaluate_common.dart index 3ca60cc9c..b03ceff5b 100644 --- a/dwds/test/evaluate_common.dart +++ b/dwds/test/evaluate_common.dart @@ -539,6 +539,25 @@ void testAll({ tearDown(() async {}); + test('in parallel (in a batch)', () async { + final library = isolate.rootLib!; + final evaluation1 = setup.service + .evaluate(isolateId, library.id!, 'MainClass(0).toString()'); + final evaluation2 = setup.service + .evaluate(isolateId, library.id!, 'MainClass(1).toString()'); + + final results = await Future.wait([evaluation1, evaluation2]); + expect( + results[0], + const TypeMatcher().having( + (instance) => instance.valueAsString, 'valueAsString', '0')); + + expect( + results[1], + const TypeMatcher().having( + (instance) => instance.valueAsString, 'valueAsString', '1')); + }); + test('with scope override', () async { final library = isolate.rootLib!; final object = await setup.service diff --git a/dwds/test/expression_compiler_service_test.dart b/dwds/test/expression_compiler_service_test.dart index ab873b847..dc4f26721 100644 --- a/dwds/test/expression_compiler_service_test.dart +++ b/dwds/test/expression_compiler_service_test.dart @@ -165,6 +165,83 @@ void main() async { await stop(); }); + + test('can evaluate multiple expressions', () async { + expect(output.stream, neverEmits(contains('[SEVERE]'))); + expect( + output.stream, + emitsThrough(contains( + '[INFO] ExpressionCompilerService: Updating dependencies...'))); + expect( + output.stream, + emitsThrough(contains( + '[INFO] ExpressionCompilerService: Updated dependencies.'))); + + expect(output.stream, + emitsThrough(contains('[INFO] ExpressionCompilerService: Stopped.'))); + final result = await service + .updateDependencies({'try': ModuleInfo('try.full.dill', 'try.dill')}); + expect(result, true, reason: 'failed to update dependencies'); + + final compilationResult1 = await service.compileExpressionToJs( + '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'true'); + final compilationResult2 = await service.compileExpressionToJs( + '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'false'); + + expect( + compilationResult1, + isA() + .having((r) => r.result, 'result', contains('return true;')) + .having((r) => r.isError, 'isError', false)); + + expect( + compilationResult2, + isA() + .having((r) => r.result, 'result', contains('return false;')) + .having((r) => r.isError, 'isError', false)); + + await stop(); + }); + + test('can compile multiple expressions in parallel', () async { + expect(output.stream, neverEmits(contains('[SEVERE]'))); + expect( + output.stream, + emitsThrough(contains( + '[INFO] ExpressionCompilerService: Updating dependencies...'))); + expect( + output.stream, + emitsThrough(contains( + '[INFO] ExpressionCompilerService: Updated dependencies.'))); + + expect(output.stream, + emitsThrough(contains('[INFO] ExpressionCompilerService: Stopped.'))); + final result = await service + .updateDependencies({'try': ModuleInfo('try.full.dill', 'try.dill')}); + expect(result, true, reason: 'failed to update dependencies'); + + final compilationResult1 = service.compileExpressionToJs( + '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'true'); + final compilationResult2 = service.compileExpressionToJs( + '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'false'); + + final results = + await Future.wait([compilationResult1, compilationResult2]); + + expect( + results[0], + isA() + .having((r) => r.result, 'result', contains('return true;')) + .having((r) => r.isError, 'isError', false)); + + expect( + results[1], + isA() + .having((r) => r.result, 'result', contains('return false;')) + .having((r) => r.isError, 'isError', false)); + + await stop(); + }); }); } From fdd5bdddb239bfd314fc00ff1ffb7cb693db1a3e Mon Sep 17 00:00:00 2001 From: Anna Gringauze Date: Fri, 9 Sep 2022 18:05:56 -0700 Subject: [PATCH 2/5] Update changelog --- dwds/CHANGELOG.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dwds/CHANGELOG.md b/dwds/CHANGELOG.md index 1ce76f114..612b520a5 100644 --- a/dwds/CHANGELOG.md +++ b/dwds/CHANGELOG.md @@ -1,7 +1,11 @@ ## 16.0.1-dev -- Allow `LoadStrategy.serverPathForModule` and `LoadStrategy.sourceMapPathForModule` - to return `null` and add error handling. +- Allow the following API to return `null` and add error handling: + - `LoadStrategy.serverPathForModule` + - `LoadStrategy.sourceMapPathForModule` +- Expression evaluation performance improvement: + - Batch `ChromeProxyService.evaluate()` requests that are close in time + and are executed in the same library and scope. ## 16.0.0 From 0d99f9d19e377c91f0f4d594d32a71555490a3df Mon Sep 17 00:00:00 2001 From: Anna Gringauze Date: Tue, 13 Sep 2022 11:49:12 -0700 Subject: [PATCH 3/5] Fix tests failures --- dwds/lib/src/services/expression_evaluator.dart | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dwds/lib/src/services/expression_evaluator.dart b/dwds/lib/src/services/expression_evaluator.dart index 0dc5045cc..7bb66457c 100644 --- a/dwds/lib/src/services/expression_evaluator.dart +++ b/dwds/lib/src/services/expression_evaluator.dart @@ -98,7 +98,7 @@ class BatchedExpressionEvaluator extends ExpressionEvaluator { _logger.fine(' - scope: $scope != ${request.scope}'); } - _evaluateBatch(currentRequests); + unawaited(_evaluateBatch(currentRequests)); currentRequests = []; libraryUri = request.libraryUri; isolateId = request.isolateId; @@ -106,15 +106,16 @@ class BatchedExpressionEvaluator extends ExpressionEvaluator { } currentRequests.add(request); } - _evaluateBatch(currentRequests); + unawaited(_evaluateBatch(currentRequests)); } - void _evaluateBatch(List requests) async { + Future _evaluateBatch(List requests) async { if (requests.isEmpty) return; final first = requests.first; if (requests.length == 1) { - super + if (first.completer.isCompleted) return; + return super .evaluateExpression( first.isolateId, first.libraryUri, first.expression, first.scope) .then(requests.first.completer.complete); @@ -130,6 +131,7 @@ class BatchedExpressionEvaluator extends ExpressionEvaluator { for (var i = 0; i < requests.length; i++) { final request = requests[i]; + if (request.completer.isCompleted) continue; _logger.fine('Getting result out of a batch for ${request.expression}'); _debugger .getProperties(list.objectId!, From 22e8c500684d45c9fe5f72967921f79f5c41ffde Mon Sep 17 00:00:00 2001 From: Anna Gringauze Date: Wed, 14 Sep 2022 15:17:27 -0700 Subject: [PATCH 4/5] Destroy isolate on app connection exit --- dwds/lib/src/connections/app_connection.dart | 7 ++++++- dwds/lib/src/services/chrome_proxy_service.dart | 3 +++ dwds/lib/src/services/expression_evaluator.dart | 5 ++++- webdev/test/e2e_test.dart | 1 + 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dwds/lib/src/connections/app_connection.dart b/dwds/lib/src/connections/app_connection.dart index 1dd7ed1f1..fc19384c8 100644 --- a/dwds/lib/src/connections/app_connection.dart +++ b/dwds/lib/src/connections/app_connection.dart @@ -15,14 +15,19 @@ class AppConnection { /// The initial connection request sent from the application in the browser. final ConnectRequest request; final _startedCompleter = Completer(); + final _doneCompleter = Completer(); final SocketConnection _connection; - AppConnection(this.request, this._connection); + AppConnection(this.request, this._connection) { + unawaited(_connection.sink.done.then((v) => _doneCompleter.complete())); + } bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod; void shutDown() => _connection.shutdown(); bool get isStarted => _startedCompleter.isCompleted; Future get onStart => _startedCompleter.future; + bool get isDone => _doneCompleter.isCompleted; + Future get onDone => _doneCompleter.future; void runMain() { if (_startedCompleter.isCompleted) { diff --git a/dwds/lib/src/services/chrome_proxy_service.dart b/dwds/lib/src/services/chrome_proxy_service.dart index 5bb93b1a3..323876cc2 100644 --- a/dwds/lib/src/services/chrome_proxy_service.dart +++ b/dwds/lib/src/services/chrome_proxy_service.dart @@ -259,6 +259,8 @@ class ChromeProxyService implements VmServiceInterface { _startedCompleter.complete(); })); + unawaited(appConnection.onDone.then((_) => destroyIsolate())); + final isolateRef = inspector.isolateRef; final timestamp = DateTime.now().millisecondsSinceEpoch; @@ -301,6 +303,7 @@ class ChromeProxyService implements VmServiceInterface { /// /// Clears out the [_inspector] and all related cached information. void destroyIsolate() { + _logger.fine('Destroying isolate'); if (!_isIsolateRunning) return; final isolate = inspector.isolate; final isolateRef = inspector.isolateRef; diff --git a/dwds/lib/src/services/expression_evaluator.dart b/dwds/lib/src/services/expression_evaluator.dart index 7bb66457c..f0d75ab87 100644 --- a/dwds/lib/src/services/expression_evaluator.dart +++ b/dwds/lib/src/services/expression_evaluator.dart @@ -59,7 +59,10 @@ class BatchedExpressionEvaluator extends ExpressionEvaluator { } @override - void close() => _requestController.close(); + void close() { + _logger.fine('Closed'); + _requestController.close(); + } @override Future evaluateExpression( diff --git a/webdev/test/e2e_test.dart b/webdev/test/e2e_test.dart index 29a4b6b1e..ecdb3536c 100644 --- a/webdev/test/e2e_test.dart +++ b/webdev/test/e2e_test.dart @@ -321,6 +321,7 @@ void main() { wsUri = getDebugServiceUri(message as String); return wsUri != null; })); + Logger.root.fine('vm service uri: $wsUri'); expect(wsUri, isNotNull); vmService = await vmServiceConnectUri(wsUri); From 35a965eea5003a29448b24563dad6b008f155a65 Mon Sep 17 00:00:00 2001 From: Anna Gringauze Date: Wed, 14 Sep 2022 17:50:30 -0700 Subject: [PATCH 5/5] Fix failure to compile debug extension and client with dart2js --- dwds/debug_extension/web/background.dart | 5 +- dwds/lib/src/injected/client.js | 2 +- .../batched_expression_evaluator.dart | 135 ++++++++++++++++++ .../src/services/chrome_proxy_service.dart | 1 + .../src/services/expression_evaluator.dart | 118 --------------- .../lib/src/web_utilities/batched_stream.dart | 85 +++++++++++ dwds/web/client.dart | 5 +- 7 files changed, 230 insertions(+), 121 deletions(-) create mode 100644 dwds/lib/src/services/batched_expression_evaluator.dart create mode 100644 dwds/lib/src/web_utilities/batched_stream.dart diff --git a/dwds/debug_extension/web/background.dart b/dwds/debug_extension/web/background.dart index a7a25e5d7..350291c77 100644 --- a/dwds/debug_extension/web/background.dart +++ b/dwds/debug_extension/web/background.dart @@ -20,7 +20,10 @@ import 'package:dwds/data/devtools_request.dart'; import 'package:dwds/data/extension_request.dart'; import 'package:dwds/data/serializers.dart'; import 'package:dwds/src/sockets.dart'; -import 'package:dwds/src/utilities/batched_stream.dart'; +// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart' +// makes dart2js skip creating background.js, so we use a copy instead. +// import 'package:dwds/src/utilities/batched_stream.dart'; +import 'package:dwds/src/web_utilities/batched_stream.dart'; import 'package:js/js.dart'; import 'package:js/js_util.dart' as js_util; import 'package:pub_semver/pub_semver.dart'; diff --git a/dwds/lib/src/injected/client.js b/dwds/lib/src/injected/client.js index 0390ffcfd..66f32e907 100644 --- a/dwds/lib/src/injected/client.js +++ b/dwds/lib/src/injected/client.js @@ -1,4 +1,4 @@ -// Generated by dart2js (NullSafetyMode.sound, csp, deferred-serialization, intern-composite-values), the Dart to JavaScript compiler version: 2.19.0-edge.6682ac145d5b99fa05a034c7838b94c5d1143f78. +// Generated by dart2js (NullSafetyMode.sound, csp, deferred-serialization, intern-composite-values), the Dart to JavaScript compiler version: 2.19.0-177.0.dev. // The code supports the following hooks: // dartPrint(message): // if this function is defined it is called instead of the Dart [print] diff --git a/dwds/lib/src/services/batched_expression_evaluator.dart b/dwds/lib/src/services/batched_expression_evaluator.dart new file mode 100644 index 000000000..64338fd38 --- /dev/null +++ b/dwds/lib/src/services/batched_expression_evaluator.dart @@ -0,0 +1,135 @@ +// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:collection/collection.dart'; +import 'package:dwds/src/utilities/domain.dart'; +import 'package:logging/logging.dart'; +import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart'; + +import '../debugging/debugger.dart'; +import '../debugging/location.dart'; +import '../debugging/modules.dart'; +import '../utilities/batched_stream.dart'; +import 'expression_compiler.dart'; +import 'expression_evaluator.dart'; + +class EvaluateRequest { + final String isolateId; + final String? libraryUri; + final String expression; + final Map? scope; + final completer = Completer(); + + EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope); +} + +class BatchedExpressionEvaluator extends ExpressionEvaluator { + final _logger = Logger('BatchedExpressionEvaluator'); + final Debugger _debugger; + final _requestController = + BatchedStreamController(delay: 200); + + BatchedExpressionEvaluator( + String entrypoint, + AppInspectorInterface inspector, + this._debugger, + Locations locations, + Modules modules, + ExpressionCompiler compiler, + ) : super(entrypoint, inspector, _debugger, locations, modules, compiler) { + _requestController.stream.listen(_processRequest); + } + + @override + void close() { + _logger.fine('Closed'); + _requestController.close(); + } + + @override + Future evaluateExpression( + String isolateId, + String? libraryUri, + String expression, + Map? scope, + ) { + final request = EvaluateRequest(isolateId, libraryUri, expression, scope); + _requestController.sink.add(request); + return request.completer.future; + } + + void _processRequest(List requests) async { + String? libraryUri; + String? isolateId; + Map? scope; + List currentRequests = []; + + for (var request in requests) { + libraryUri ??= request.libraryUri; + isolateId ??= request.isolateId; + scope ??= request.scope; + + if (libraryUri != request.libraryUri || + isolateId != request.isolateId || + !MapEquality().equals(scope, request.scope)) { + _logger.fine('New batch due to'); + if (libraryUri != request.libraryUri) { + _logger.fine(' - library uri: $libraryUri != ${request.libraryUri}'); + } + if (isolateId != request.isolateId) { + _logger.fine(' - isolateId: $isolateId != ${request.isolateId}'); + } + if (!MapEquality().equals(scope, request.scope)) { + _logger.fine(' - scope: $scope != ${request.scope}'); + } + + unawaited(_evaluateBatch(currentRequests)); + currentRequests = []; + libraryUri = request.libraryUri; + isolateId = request.isolateId; + scope = request.scope; + } + currentRequests.add(request); + } + unawaited(_evaluateBatch(currentRequests)); + } + + Future _evaluateBatch(List requests) async { + if (requests.isEmpty) return; + + final first = requests.first; + if (requests.length == 1) { + if (first.completer.isCompleted) return; + return super + .evaluateExpression( + first.isolateId, first.libraryUri, first.expression, first.scope) + .then(requests.first.completer.complete); + } + + final expressions = requests.map((r) => r.expression).join(', '); + final batchedExpression = '[ $expressions ]'; + + _logger.fine('Evaluating batch of expressions $batchedExpression'); + + final RemoteObject list = await super.evaluateExpression( + first.isolateId, first.libraryUri, batchedExpression, first.scope); + + for (var i = 0; i < requests.length; i++) { + final request = requests[i]; + if (request.completer.isCompleted) continue; + _logger.fine('Getting result out of a batch for ${request.expression}'); + _debugger + .getProperties(list.objectId!, + offset: i, count: 1, length: requests.length) + .then((v) { + final result = v.first.value; + _logger.fine( + 'Got result out of a batch for ${request.expression}: $result'); + request.completer.complete(result); + }); + } + } +} diff --git a/dwds/lib/src/services/chrome_proxy_service.dart b/dwds/lib/src/services/chrome_proxy_service.dart index 323876cc2..daf4237a3 100644 --- a/dwds/lib/src/services/chrome_proxy_service.dart +++ b/dwds/lib/src/services/chrome_proxy_service.dart @@ -30,6 +30,7 @@ import '../utilities/dart_uri.dart'; import '../utilities/sdk_configuration.dart'; import '../utilities/shared.dart'; import 'expression_evaluator.dart'; +import 'batched_expression_evaluator.dart'; /// A proxy from the chrome debug protocol to the dart vm service protocol. class ChromeProxyService implements VmServiceInterface { diff --git a/dwds/lib/src/services/expression_evaluator.dart b/dwds/lib/src/services/expression_evaluator.dart index f0d75ab87..35aef1cd6 100644 --- a/dwds/lib/src/services/expression_evaluator.dart +++ b/dwds/lib/src/services/expression_evaluator.dart @@ -4,7 +4,6 @@ import 'dart:async'; -import 'package:collection/collection.dart'; import 'package:dwds/src/utilities/domain.dart'; import 'package:logging/logging.dart'; import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart'; @@ -15,7 +14,6 @@ import '../debugging/location.dart'; import '../debugging/modules.dart'; import '../loaders/strategy.dart'; import '../utilities/objects.dart' as chrome; -import '../utilities/batched_stream.dart'; import 'expression_compiler.dart'; class ErrorKind { @@ -33,122 +31,6 @@ class ErrorKind { String toString() => _kind; } -class EvaluateRequest { - final String isolateId; - final String? libraryUri; - final String expression; - final Map? scope; - final completer = Completer(); - - EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope); -} - -class BatchedExpressionEvaluator extends ExpressionEvaluator { - final _requestController = - BatchedStreamController(delay: 200); - - BatchedExpressionEvaluator( - String entrypoint, - AppInspectorInterface inspector, - Debugger debugger, - Locations locations, - Modules modules, - ExpressionCompiler compiler, - ) : super(entrypoint, inspector, debugger, locations, modules, compiler) { - _requestController.stream.listen(_processRequest); - } - - @override - void close() { - _logger.fine('Closed'); - _requestController.close(); - } - - @override - Future evaluateExpression( - String isolateId, - String? libraryUri, - String expression, - Map? scope, - ) { - final request = EvaluateRequest(isolateId, libraryUri, expression, scope); - _requestController.sink.add(request); - return request.completer.future; - } - - void _processRequest(List requests) async { - String? libraryUri; - String? isolateId; - Map? scope; - List currentRequests = []; - - for (var request in requests) { - libraryUri ??= request.libraryUri; - isolateId ??= request.isolateId; - scope ??= request.scope; - - if (libraryUri != request.libraryUri || - isolateId != request.isolateId || - !MapEquality().equals(scope, request.scope)) { - _logger.fine('New batch due to'); - if (libraryUri != request.libraryUri) { - _logger.fine(' - library uri: $libraryUri != ${request.libraryUri}'); - } - if (isolateId != request.isolateId) { - _logger.fine(' - isolateId: $isolateId != ${request.isolateId}'); - } - if (!MapEquality().equals(scope, request.scope)) { - _logger.fine(' - scope: $scope != ${request.scope}'); - } - - unawaited(_evaluateBatch(currentRequests)); - currentRequests = []; - libraryUri = request.libraryUri; - isolateId = request.isolateId; - scope = request.scope; - } - currentRequests.add(request); - } - unawaited(_evaluateBatch(currentRequests)); - } - - Future _evaluateBatch(List requests) async { - if (requests.isEmpty) return; - - final first = requests.first; - if (requests.length == 1) { - if (first.completer.isCompleted) return; - return super - .evaluateExpression( - first.isolateId, first.libraryUri, first.expression, first.scope) - .then(requests.first.completer.complete); - } - - final expressions = requests.map((r) => r.expression).join(', '); - final batchedExpression = '[ $expressions ]'; - - _logger.fine('Evaluating batch of expressions $batchedExpression'); - - final RemoteObject list = await super.evaluateExpression( - first.isolateId, first.libraryUri, batchedExpression, first.scope); - - for (var i = 0; i < requests.length; i++) { - final request = requests[i]; - if (request.completer.isCompleted) continue; - _logger.fine('Getting result out of a batch for ${request.expression}'); - _debugger - .getProperties(list.objectId!, - offset: i, count: 1, length: requests.length) - .then((v) { - final result = v.first.value; - _logger.fine( - 'Got result out of a batch for ${request.expression}: $result'); - request.completer.complete(result); - }); - } - } -} - /// ExpressionEvaluator provides functionality to evaluate dart expressions /// from text user input in the debugger, using chrome remote debugger to /// collect context for evaluation (scope, types, modules), and using diff --git a/dwds/lib/src/web_utilities/batched_stream.dart b/dwds/lib/src/web_utilities/batched_stream.dart new file mode 100644 index 000000000..6da6465db --- /dev/null +++ b/dwds/lib/src/web_utilities/batched_stream.dart @@ -0,0 +1,85 @@ +// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'package:async/async.dart'; + +/// Stream controller allowing to batch events. +class BatchedStreamController { + static const _defaultBatchDelayMilliseconds = 1000; + static const _checkDelayMilliseconds = 100; + + final int _batchDelayMilliseconds; + + final StreamController _inputController; + late StreamQueue _inputQueue; + + final StreamController> _outputController; + final Completer _completer = Completer(); + + /// Create batched stream controller. + /// + /// Collects events from input [sink] and emits them in batches to the + /// output [stream] every [delay] milliseconds. Keeps the original order. + BatchedStreamController({ + int delay = _defaultBatchDelayMilliseconds, + }) : _batchDelayMilliseconds = delay, + _inputController = StreamController(), + _outputController = StreamController>() { + _inputQueue = StreamQueue(_inputController.stream); + unawaited(_batchAndSendEvents()); + } + + /// Sink collecting events. + StreamSink get sink => _inputController.sink; + + /// Output stream of batch events. + Stream> get stream => _outputController.stream; + + /// Close the controller. + Future close() async { + unawaited(_inputController.close()); + return _completer.future.then((value) => _outputController.close()); + } + + /// Send events to the output in a batch every [_batchDelayMilliseconds]. + Future _batchAndSendEvents() async { + const duration = Duration(milliseconds: _checkDelayMilliseconds); + final buffer = []; + + // Batch events every `_batchDelayMilliseconds`. + // + // Note that events might arrive at random intervals, so collecting + // a predetermined number of events to send in a batch might delay + // the batch indefinitely. Instead, check for new events every + // `_checkDelayMilliseconds` to make sure batches are sent in regular + // intervals. + var lastSendTime = DateTime.now().millisecondsSinceEpoch; + while (await _hasEventOrTimeOut(duration)) { + if (await _hasEventDuring(duration)) { + buffer.add(await _inputQueue.next); + } + + final now = DateTime.now().millisecondsSinceEpoch; + if (now > lastSendTime + _batchDelayMilliseconds) { + lastSendTime = now; + if (buffer.isNotEmpty) { + _outputController.sink.add(List.from(buffer)); + buffer.clear(); + } + } + } + + if (buffer.isNotEmpty) { + _outputController.sink.add(List.from(buffer)); + } + _completer.complete(true); + } + + Future _hasEventOrTimeOut(Duration duration) => + _inputQueue.hasNext.timeout(duration, onTimeout: () => true); + + Future _hasEventDuring(Duration duration) => + _inputQueue.hasNext.timeout(duration, onTimeout: () => false); +} diff --git a/dwds/web/client.dart b/dwds/web/client.dart index ac7eb7f3b..8fc88ad35 100644 --- a/dwds/web/client.dart +++ b/dwds/web/client.dart @@ -19,7 +19,10 @@ import 'package:dwds/data/register_event.dart'; import 'package:dwds/data/run_request.dart'; import 'package:dwds/data/serializers.dart'; import 'package:dwds/src/sockets.dart'; -import 'package:dwds/src/utilities/batched_stream.dart'; +// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart' +// makes dart2js skip creating background.js, so we use a copy instead. +// import 'package:dwds/src/utilities/batched_stream.dart'; +import 'package:dwds/src/web_utilities/batched_stream.dart'; import 'package:js/js.dart'; import 'package:sse/client/sse_client.dart'; import 'package:uuid/uuid.dart';