-
Notifications
You must be signed in to change notification settings - Fork 0
Speed up streamed-proto query output by distributing work to multiple threads #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6bd312b
41fbacf
0c9a1d2
913d4a3
9843a5e
9a0efa0
1852be0
89e8b3b
5fc8b13
ec1bf6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,18 +13,26 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
// limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||
package com.google.devtools.build.lib.query2.query.output; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.common.collect.Iterables; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.packages.LabelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.packages.Target; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.query2.proto.proto2api.Build; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.protobuf.CodedOutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
import java.io.IOException; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import java.io.OutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.*; | ||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||
* An output formatter that outputs a protocol buffer representation of a query result and outputs | ||||||||||||||||||||||||||||||||||||||||||||||||||
* the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()} | ||||||||||||||||||||||||||||||||||||||||||||||||||
* on a {@code Build.QueryResult} object the full result can be reconstructed. | ||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||
public class StreamedProtoOutputFormatter extends ProtoOutputFormatter { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
public String getName() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return "streamed_proto"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -34,13 +42,120 @@ public String getName() { | |||||||||||||||||||||||||||||||||||||||||||||||||
public OutputFormatterCallback<Target> createPostFactoStreamCallback( | ||||||||||||||||||||||||||||||||||||||||||||||||||
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return new OutputFormatterCallback<Target>() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2; | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Magic Numbers UsageThe code uses a magic number (500) for chunk size without explanation of why this value was chosen. This reduces maintainability as future developers won't understand the reasoning behind this specific value or know when it should be adjusted. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private static final int TARGETS_PER_CHUNK = 500; | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Magic Number UsageMagic number 500 lacks explanation for its selection. This hinders maintainability as future developers cannot understand the rationale for this specific batch size value.
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private final LabelPrinter ourLabelPrinter = labelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
public void processOutput(Iterable<Target> partialResult) | ||||||||||||||||||||||||||||||||||||||||||||||||||
throws IOException, InterruptedException { | ||||||||||||||||||||||||||||||||||||||||||||||||||
for (Target target : partialResult) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out); | ||||||||||||||||||||||||||||||||||||||||||||||||||
ForkJoinTask<?> writeAllTargetsFuture; | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Management Issue: The ForkJoinPool is created with parallelism equal to all available processors without any safeguards against resource exhaustion. On many-core systems, this could lead to excessive thread creation and CPU contention. Consider limiting the maximum parallelism and adding an exception handler: // Limit the number of threads to avoid resource exhaustion
int parallelism = Math.min(Runtime.getRuntime().availableProcessors(), 4);
try (ForkJoinPool executor = new ForkJoinPool(
parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(thread, throwable) -> {
// Log uncaught exceptions in worker threads
System.err.println("Worker thread exception: " + throwable);
},
// we use asyncMode to ensure the queue is processed FIFO
true)) { |
||||||||||||||||||||||||||||||||||||||||||||||||||
try (ForkJoinPool executor = | ||||||||||||||||||||||||||||||||||||||||||||||||||
new ForkJoinPool( | ||||||||||||||||||||||||||||||||||||||||||||||||||
Runtime.getRuntime().availableProcessors(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
ForkJoinPool.defaultForkJoinWorkerThreadFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||
null, | ||||||||||||||||||||||||||||||||||||||||||||||||||
// we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||||||||||||||||||||||||||||||||||||||||||||||||||
// throughput | ||||||||||||||||||||||||||||||||||||||||||||||||||
true)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+54
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unclosed ForkJoinPoolThe ForkJoinPool is created with try-with-resources but the Future's completion is awaited after the try block exits, risking premature pool shutdown before task completion.
Standards
Comment on lines
+54
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Pool LifecycleCreating a new ForkJoinPool for each processOutput call is expensive. Each pool creation allocates thread resources that must be initialized and later destroyed. Under high query volume, this pattern creates significant thread churn and initialization overhead, potentially degrading overall system performance.
Standards
Comment on lines
+54
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Leak RiskThe ForkJoinTask created inside the try-with-resources block continues execution after the ForkJoinPool is closed. This creates a resource leak risk as tasks may attempt to use the shutdown executor, potentially causing RejectedExecutionException or task failures.
Commitable Suggestion
Suggested change
Standards
Comment on lines
+54
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Leak RiskForkJoinPool is created with no exception handler (null parameter). Uncaught exceptions in worker threads may cause thread termination without proper cleanup, potentially reducing parallelism effectiveness under load and causing resource leaks. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE); | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Exhaustion RiskThe queue has a bounded size but no handling for queue full conditions. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock. Consider adding timeout mechanisms and graceful degradation when queue capacity is reached. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
var stillAddingTargetsToQueue = new AtomicBoolean(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||
writeAllTargetsFuture = | ||||||||||||||||||||||||||||||||||||||||||||||||||
executor.submit( | ||||||||||||||||||||||||||||||||||||||||||||||||||
() -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
Future<List<byte[]>> targets = targetQueue.take(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
for (byte[] target : targets.get()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
out.write(target); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+55
to
+72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion ForkJoinPool worker is blocking on The consumer task runs inside the same
Because Recommended approaches:
This will eliminate the starvation risk and make behaviour more predictable. |
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+68
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deadlock PotentialThe consumer thread calls targetQueue.take() which blocks until an element is available. If an exception occurs during queue population and stillAddingTargetsToQueue remains true while the queue is empty, the consumer thread will deadlock waiting indefinitely for items that will never arrive.
Commitable Suggestion
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+74
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Interruption HandlingInterruptedException is caught but the thread's interrupted status is not restored. When wrapping InterruptedException, the current thread's interrupt status should be preserved with Thread.currentThread().interrupt() to ensure proper interruption propagation.
Commitable Suggestion
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Safety Issue: The ExecutionException handling is inadequate, using a generic RuntimeException with a TODO comment. This can mask important exceptions during task execution, potentially leading to resource leaks or thread pool exhaustion. Consider unwrapping and properly handling the cause: catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt(); // Preserve interrupt status
throw new WrappedInterruptedException((InterruptedException) cause);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("Error processing targets", cause);
}
} |
||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: figure out what might be in here and propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+78
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception Swallowing RiskGeneric exception handling wraps ExecutionException in RuntimeException without preserving cause details. Attackers could exploit swallowed exceptions to hide malicious activities. Proper exception handling needed for security visibility.
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+79
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Also, consider logging the exception with sufficient context to aid debugging. // TODO: figure out what might be in here and propagate
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error during target processing", cause);
}
Comment on lines
+78
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Error PropagationExecutionException caught with TODO comment and generic RuntimeException wrapping. Specific error information is lost, preventing proper error handling and recovery.
Standards
Comment on lines
+78
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uncaught RuntimeException PropagationUnresolved TODO with generic RuntimeException wrapping can mask actual failure causes. The code throws a generic RuntimeException without proper cause analysis, potentially hiding important error information and making debugging difficult in production.
Commitable Suggestion
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Concurrency Issue: The blocking Consider using a non-blocking approach with timeouts: // Use offer with timeout to prevent indefinite blocking
Future<List<byte[]>> task = executor.submit(() -> writeTargetsDelimitedToByteArrays(targets));
if (!targetQueue.offer(task, 5, TimeUnit.SECONDS)) {
// If queue is full for too long, process this batch directly to make progress
for (byte[] target : task.get(30, TimeUnit.SECONDS)) {
out.write(target);
}
} |
||||||||||||||||||||||||||||||||||||||||||||||||||
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets))); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||
stillAddingTargetsToQueue.set(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After setting } finally {
stillAddingTargetsToQueue.set(false);
executor.shutdown(); // Initiate shutdown
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Wait for tasks to complete
System.err.println("ForkJoinPool did not terminate in the specified timeout.");
// Optionally, log the state of the tasks that are still running.
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
} |
||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
writeAllTargetsFuture.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+94
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the previous } catch (ExecutionException e) {
// TODO: propagate
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error during target processing", cause);
}
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception Handling GapThe code catches ExecutionException but wraps it in a generic RuntimeException without preserving the original exception type. This loses important error context and makes debugging harder.
Suggested change
Standards
Comment on lines
+92
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uncaught ExceptionsExecutionException is caught but wrapped in RuntimeException with a TODO comment. This loses specific error information and prevents proper error handling.
Standards
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled Interrupted ExceptionExecutionException handling doesn't restore interrupted status when InterruptedException is the cause. Thread interruption signals could be lost, preventing proper thread termination. Security-sensitive operations might continue after intended shutdown.
Standards
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled ExecutionException PropagationAnother unresolved TODO with generic RuntimeException wrapping. The code fails to properly analyze and handle the underlying cause of ExecutionException, which could contain important failure information needed for proper error handling and recovery.
Commitable Suggestion
Suggested change
Standards
Comment on lines
+91
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled Interrupted ExceptionThe code catches ExecutionException but doesn't handle InterruptedException from Future.get(). Thread interruption signals are lost, potentially causing thread leaks and preventing proper application shutdown. This can lead to resource exhaustion in long-running applications. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return targets.stream().map(target -> writeDelimited(toProto(target))).toList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+99
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Memory Exhaustion RiskThe method processes all targets in memory without streaming or chunking large outputs. For large target lists, this could cause excessive memory consumption leading to OutOfMemoryError. Consider implementing true streaming with backpressure to prevent memory exhaustion. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private Build.Target toProto(Target target) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return toTargetProtoBuffer(target, ourLabelPrinter); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private static byte[] writeDelimited(Build.Target targetProtoBuffer) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
var serializedSize = targetProtoBuffer.getSerializedSize(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
- var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
- var output = new byte[headerSize + serializedSize]; | ||||||||||||||||||||||||||||||||||||||||||||||||||
- var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
- targetProtoBuffer.writeTo(codedOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ byte[] output = new byte[headerSize + serializedSize]; | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ // 1. write the var-int length prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ headerOut.writeUInt32NoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ headerOut.flush(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ // 2. write the message bytes immediately after the prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ CodedOutputStream bodyOut = | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ CodedOutputStream.newInstance(output, headerSize, serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ targetProtoBuffer.writeTo(bodyOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||
+ bodyOut.flush(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+131
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Synchronization MissingMultiple threads write to the same output stream without synchronization. Concurrent writes to the output stream can cause data corruption as bytes from different threads interleave unpredictably.
Standards
Comment on lines
+124
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unclosed Resource LeakCodedOutputStream instances are not properly closed. The streams implement Closeable interface and should be closed to prevent resource leaks, especially in high-throughput concurrent processing.
Standards
Comment on lines
+124
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant CodedOutputStream CreationCreating separate CodedOutputStream instances for header and body adds overhead. A single CodedOutputStream with appropriate positioning would be more efficient, reducing object creation and improving serialization performance under high throughput. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
return output; | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+134
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Syntax ErrorSyntax error in the writeDelimited method. The code has an extra closing bracket after 'return output;' but before the catch block, which will prevent compilation and cause build failures. This breaks the try-catch structure.
Commitable Suggestion
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+134
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unclosed Resources RiskThe method has an extra closing brace that creates a syntax error. This will cause compilation failure and prevent the code from functioning properly.
Suggested change
Standards
Comment on lines
+134
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Syntax ErrorSyntax error with mismatched curly braces. The extra closing brace on line 135 breaks the method structure, causing compilation failure.
Standards
Comment on lines
+134
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Broken Exception ChainThe writeDelimited method has mismatched braces causing a logical syntax error. The extra closing brace at line 135 will prevent compilation, and the exception handling block at lines 136-137 is improperly structured, breaking the intended error propagation flow. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private static class WrappedIOException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||
private WrappedIOException(IOException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
public IOException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return (IOException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+145
to
+147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider whether wrapping the @Override
public IOException getCause() {
return cause;
} |
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
private static class WrappedInterruptedException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||
private WrappedInterruptedException(InterruptedException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
public InterruptedException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
return (InterruptedException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+157
to
+159
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to @Override
public InterruptedException getCause() {
return cause;
} |
||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+140
to
+160
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrapper Classes DuplicationTwo nearly identical wrapper exception classes follow the same pattern with duplicated code structure. This violates DRY principle and increases maintenance burden. A generic wrapper with type parameter or unified exception handling strategy would be more maintainable. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested Anonymous Classes
Complex parallel processing logic is implemented within an anonymous inner class. This creates maintainability issues as the code is deeply nested and tightly coupled to its parent class. Extracting this to a named class would improve readability and testability.
Standards