Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cd5beb4
Initial save pre merge headers
Gabriel-Trintinalia Apr 26, 2025
c35fe15
Initial save pre merge headers
Gabriel-Trintinalia Apr 26, 2025
9a7258c
Add download headers parallelism flag, checkpoint and none validation…
Gabriel-Trintinalia Apr 27, 2025
0d1b0a1
Set chain head
Gabriel-Trintinalia Apr 27, 2025
c61d998
Simplify
Gabriel-Trintinalia Apr 27, 2025
1f85856
Introduce flag
Gabriel-Trintinalia Apr 27, 2025
a99c86c
Change default header parallelism to 4
Gabriel-Trintinalia Apr 27, 2025
b6d1823
Remove DOWNLOADER_HEADER_PARALLELISM_FLAG for now
Gabriel-Trintinalia Apr 27, 2025
6c6c461
Update javadoc
Gabriel-Trintinalia Apr 28, 2025
ba6e11b
Adjust flag name and option
Gabriel-Trintinalia Apr 28, 2025
6212a9f
Merge branch 'main' into sync-save-pre-merge-headers
Gabriel-Trintinalia Apr 28, 2025
8b46403
Revert noneHeaderValidationPolicy change
Gabriel-Trintinalia Apr 28, 2025
5995e0f
Merge branch 'sync-save-pre-merge-headers' of https://github.com/Gabr…
Gabriel-Trintinalia Apr 28, 2025
3ef30f1
Add unit tests
Gabriel-Trintinalia Apr 28, 2025
7b61541
Undo change
Gabriel-Trintinalia Apr 28, 2025
d5692e9
Pass constant to test
Gabriel-Trintinalia Apr 28, 2025
6c3f076
Accept PR suggestions
Gabriel-Trintinalia Apr 29, 2025
74e6aba
Accept PR suggestions
Gabriel-Trintinalia Apr 29, 2025
d7fac60
Merge branch 'main' into sync-save-pre-merge-headers
Gabriel-Trintinalia Apr 29, 2025
71e8fcc
Accept PR suggestions
Gabriel-Trintinalia Apr 29, 2025
6b4c69a
Accept PR suggestions
Gabriel-Trintinalia Apr 29, 2025
600bb7b
Merge branch 'main' into sync-save-pre-merge-headers
Gabriel-Trintinalia Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.cli.options;

import static org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration.DEFAULT_SAVE_PRE_MERGE_HEADERS_ONLY_ENABLED;

import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.ImmutableSnapSyncConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.snapsync.SnapSyncConfiguration;
Expand Down Expand Up @@ -87,6 +89,9 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration

private static final String SNAP_SYNC_BFT_ENABLED_FLAG = "--Xsnapsync-bft-enabled";

private static final String SAVE_PRE_MERGE_HEADERS_ONLY_FLAG =
"--Xsynchronizer-downloader-pre-merge-headers-only-enabled";

/**
* Parse block propagation range.
*
Expand Down Expand Up @@ -333,6 +338,15 @@ public void parseBlockPropagationRange(final String arg) {
private Boolean snapTransactionIndexingEnabled =
SnapSyncConfiguration.DEFAULT_SNAP_SYNC_TRANSACTION_INDEXING_ENABLED;

@CommandLine.Option(
names = SAVE_PRE_MERGE_HEADERS_ONLY_FLAG,
paramLabel = "<Boolean>",
hidden = true,
arity = "0..1",
description =
"Enable the downloader to save only headers for pre-merge blocks. (default: ${DEFAULT-VALUE})")
private Boolean savePreMergeHeadersOnlyEnabled = DEFAULT_SAVE_PRE_MERGE_HEADERS_ONLY_ENABLED;

private SynchronizerOptions() {}

/**
Expand Down Expand Up @@ -414,6 +428,7 @@ public static SynchronizerOptions fromConfig(final SynchronizerConfiguration con
options.snapsyncServerEnabled = config.getSnapSyncConfiguration().isSnapServerEnabled();
options.snapTransactionIndexingEnabled =
config.getSnapSyncConfiguration().isSnapSyncTransactionIndexingEnabled();
options.savePreMergeHeadersOnlyEnabled = config.isSavePreMergeHeadersOnlyEnabled();
return options;
}

Expand Down Expand Up @@ -450,6 +465,7 @@ public SynchronizerConfiguration.Builder toDomainObject() {
.build());
builder.checkpointPostMergeEnabled(checkpointPostMergeSyncEnabled);
builder.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled);
builder.savePreMergeHeadersOnlyEnabled(savePreMergeHeadersOnlyEnabled);
return builder;
}

Expand Down Expand Up @@ -506,7 +522,9 @@ public List<String> getCLIOptions() {
SNAP_SERVER_ENABLED_FLAG,
OptionParser.format(snapsyncServerEnabled),
SNAP_TRANSACTION_INDEXING_ENABLED_FLAG,
OptionParser.format(snapTransactionIndexingEnabled));
OptionParser.format(snapTransactionIndexingEnabled),
SAVE_PRE_MERGE_HEADERS_ONLY_FLAG,
OptionParser.format(savePreMergeHeadersOnlyEnabled));
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ protected SynchronizerConfiguration.Builder createCustomizedDomainObject() {
SnapSyncConfiguration.DEFAULT_BYTECODE_COUNT_PER_REQUEST + 2)
.isSnapServerEnabled(Boolean.TRUE)
.isSnapSyncTransactionIndexingEnabled(Boolean.TRUE)
.build());
.build())
.savePreMergeHeadersOnlyEnabled(
SnapSyncConfiguration.DEFAULT_SAVE_PRE_MERGE_HEADERS_ONLY_ENABLED);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions config/src/main/resources/sepolia.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
]
},
"checkpoint": {
"hash": "0x491ebac1b7f9c0eb426047a495dc577140cb3e09036cd3f7266eda86b635d9fa",
"number": 1273020,
"totalDifficulty": "0x13DE1653E7D280"
"hash": "0x36fb89fba5b7857cf0ca78b5a9625b4043ff4555dfce9b7bcdcdd758a11eb946",
"number": 1735371,
"totalDifficulty": "0x3c656d23029ab0"
}
},
"alloc":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ private DefaultBlockchain(
final Hash chainHead = blockchainStorage.getChainHead().get();
chainHeader = blockchainStorage.getBlockHeader(chainHead).get();
totalDifficulty = blockchainStorage.getTotalDifficulty(chainHead).get();
final BlockBody chainHeadBody = blockchainStorage.getBlockBody(chainHead).get();
chainHeadTransactionCount = chainHeadBody.getTransactions().size();
chainHeadOmmerCount = chainHeadBody.getOmmers().size();

blockchainStorage
.getBlockBody(chainHead)
.ifPresent(
headBlockBody -> {
chainHeadTransactionCount = headBlockBody.getTransactions().size();
chainHeadOmmerCount = headBlockBody.getOmmers().size();
});

this.reorgLoggingThreshold = reorgLoggingThreshold;
this.blockChoiceRule = heaviestChainBlockChoiceRule;
Expand Down Expand Up @@ -433,6 +438,23 @@ public synchronized void storeBlock(final Block block, final List<TransactionRec
appendBlockHelper(new BlockWithReceipts(block, receipts), true, true);
}

@Override
public void unsafeStoreHeader(
final BlockHeader blockHeader,
final Difficulty totalDifficulty,
final boolean updateChainHead) {
final BlockchainStorage.Updater updater = blockchainStorage.updater();
updater.putBlockHeader(blockHeader.getHash(), blockHeader);
updater.putBlockHash(blockHeader.getNumber(), blockHeader.getBlockHash());
updater.putTotalDifficulty(blockHeader.getHash(), totalDifficulty);
if (updateChainHead) {
this.chainHeader = blockHeader;
this.totalDifficulty = totalDifficulty;
updater.setChainHead(blockHeader.getBlockHash());
}
updater.commit();
}

private void cacheBlockData(final Block block, final List<TransactionReceipt> receipts) {
blockHeadersCache.ifPresent(cache -> cache.put(block.getHash(), block.getHeader()));
blockBodiesCache.ifPresent(cache -> cache.put(block.getHash(), block.getBody()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ public interface MutableBlockchain extends Blockchain {
*/
void storeBlock(Block block, List<TransactionReceipt> receipts);

/**
* Store a block header to the blockchain, updating the chain state if necessary.
*
* @param blockHeader The block header to store.
*/
void unsafeStoreHeader(
BlockHeader blockHeader, Difficulty totalDifficulty, boolean updateChainHead);

void unsafeImportBlock(
final Block block,
final List<TransactionReceipt> receipts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface DownloadPipelineFactory {
* @param target the target the chain download is working to catch up to.
* @return the created but not yet started pipeline.
*/
Pipeline<?> createDownloadPipelineForSyncTarget(SyncTarget target);
Pipeline<?> createDownloadPipelineForSyncTarget(SyncState syncState, SyncTarget target);

CompletionStage<Void> startPipeline(
EthScheduler scheduler, SyncState syncState, SyncTarget syncTarget, Pipeline<?> pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ private synchronized CompletionStage<Void> startDownloadForSyncTarget(final Sync
.addArgument(() -> target.commonAncestor().getNumber())
.addArgument(() -> target.commonAncestor().getBlockHash())
.log();
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);
currentDownloadPipeline =
downloadPipelineFactory.createDownloadPipelineForSyncTarget(syncState, target);
return downloadPipelineFactory.startPipeline(
scheduler, syncState, target, currentDownloadPipeline);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.sync;

import static org.hyperledger.besu.util.log.LogUtil.throttledLog;

import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Difficulty;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A step in the synchronization process that saves historical block headers. */
public class SavePreMergeHeadersStep implements Function<BlockHeader, Stream<BlockHeader>> {
private static final Logger LOG = LoggerFactory.getLogger(SavePreMergeHeadersStep.class);
private final MutableBlockchain blockchain;
private final long preMergeBlockNumber;

private final AtomicBoolean shouldLog = new AtomicBoolean(true);
private static final int LOG_REPEAT_DELAY_SECONDS = 30;
private static final int LOG_PROGRESS_INTERVAL = 1000;

public SavePreMergeHeadersStep(
final MutableBlockchain blockchain, final long preMergeBlockNumber) {
this.blockchain = blockchain;
this.preMergeBlockNumber = preMergeBlockNumber;
}

@Override
public Stream<BlockHeader> apply(final BlockHeader blockHeader) {
long blockNumber = blockHeader.getNumber();
if (isPostMergeBlock(blockNumber)) {
return Stream.of(blockHeader);
}
storeBlockHeader(blockHeader);
logProgress(blockHeader);
return Stream.empty();
}

private boolean isPostMergeBlock(final long blockNumber) {
return preMergeBlockNumber <= 0 || blockNumber > preMergeBlockNumber;
}

private void storeBlockHeader(final BlockHeader blockHeader) {
Difficulty difficulty = blockchain.calculateTotalDifficulty(blockHeader);
blockchain.unsafeStoreHeader(blockHeader, difficulty, true);
}

private void logProgress(final BlockHeader blockHeader) {
if (blockHeader.getNumber() == preMergeBlockNumber) {
LOG.info("Pre-merge headers import completed at block {}", blockHeader.toLogString());
} else {
long blockNumber = blockHeader.getNumber();
if (blockNumber % LOG_PROGRESS_INTERVAL == 0) {
double importPercent = (double) (100 * blockNumber) / preMergeBlockNumber;
throttledLog(
LOG::info,
String.format(
"Pre-merge headers import progress: %d of %d (%.2f%%)",
blockNumber, preMergeBlockNumber, importPercent),
shouldLog,
LOG_REPEAT_DELAY_SECONDS);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class SynchronizerConfiguration {
private final long worldStateMinMillisBeforeStalling;
private final long propagationManagerGetBlockTimeoutMillis;
private final boolean isPeerTaskSystemEnabled;
private final boolean savePreMergeHeadersOnlyEnabled;

private SynchronizerConfiguration(
final int syncPivotDistance,
Expand All @@ -110,7 +111,8 @@ private SynchronizerConfiguration(
final int maxTrailingPeers,
final long propagationManagerGetBlockTimeoutMillis,
final boolean checkpointPostMergeEnabled,
final boolean isPeerTaskSystemEnabled) {
final boolean isPeerTaskSystemEnabled,
final boolean savePreMergeHeadersOnlyEnabled) {
this.syncPivotDistance = syncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.syncMinimumPeerCount = syncMinimumPeerCount;
Expand All @@ -134,6 +136,7 @@ private SynchronizerConfiguration(
this.propagationManagerGetBlockTimeoutMillis = propagationManagerGetBlockTimeoutMillis;
this.checkpointPostMergeEnabled = checkpointPostMergeEnabled;
this.isPeerTaskSystemEnabled = isPeerTaskSystemEnabled;
this.savePreMergeHeadersOnlyEnabled = savePreMergeHeadersOnlyEnabled;
}

public static Builder builder() {
Expand Down Expand Up @@ -263,6 +266,10 @@ public boolean isPeerTaskSystemEnabled() {
return isPeerTaskSystemEnabled;
}

public boolean isSavePreMergeHeadersOnlyEnabled() {
return savePreMergeHeadersOnlyEnabled;
}

public static class Builder {
private SyncMode syncMode = SyncMode.FULL;
private int syncMinimumPeerCount = DEFAULT_SYNC_MINIMUM_PEERS;
Expand All @@ -288,6 +295,7 @@ public static class Builder {
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
private int worldStateTaskCacheSize = DEFAULT_WORLD_STATE_TASK_CACHE_SIZE;
private boolean isPeerTaskSystemEnabled = false;
private boolean savePreMergeHeadersOnlyEnabled = false;

private long propagationManagerGetBlockTimeoutMillis =
DEFAULT_PROPAGATION_MANAGER_GET_BLOCK_TIMEOUT_MILLIS;
Expand Down Expand Up @@ -419,6 +427,11 @@ public Builder isPeerTaskSystemEnabled(final boolean isPeerTaskSystemEnabled) {
return this;
}

public Builder savePreMergeHeadersOnlyEnabled(final boolean savePreMergeHeadersOnlyEnabled) {
this.savePreMergeHeadersOnlyEnabled = savePreMergeHeadersOnlyEnabled;
return this;
}

public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
syncPivotDistance,
Expand All @@ -443,7 +456,8 @@ public SynchronizerConfiguration build() {
maxTrailingPeers,
propagationManagerGetBlockTimeoutMillis,
checkpointPostMergeEnabled,
isPeerTaskSystemEnabled);
isPeerTaskSystemEnabled,
savePreMergeHeadersOnlyEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hyperledger.besu.ethereum.eth.sync.DownloadBodiesStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.SavePreMergeHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.range.RangeHeadersFetcher;
Expand Down Expand Up @@ -112,8 +113,8 @@ public CompletionStage<Void> startPipeline(
}

@Override
public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncTarget target) {

public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(
final SyncState syncState, final SyncTarget target) {
final int downloaderParallelism = syncConfig.getDownloaderParallelism();
final int headerRequestSize = syncConfig.getDownloaderHeaderRequestSize();
final int singleHeaderBufferSize = headerRequestSize * downloaderParallelism;
Expand All @@ -139,6 +140,9 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
metricsSystem);
final RangeHeadersValidationStep validateHeadersJoinUpStep =
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
final SavePreMergeHeadersStep savePreMergeHeadersStep =
new SavePreMergeHeadersStep(
protocolContext.getBlockchain(), getPreMergeHeaderBlockNumber(syncState));
final DownloadBodiesStep downloadBodiesStep =
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
final DownloadReceiptsStep downloadReceiptsStep =
Expand Down Expand Up @@ -167,6 +171,7 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
"fastSync")
.thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism)
.thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize)
.thenFlatMap("savePreMergeHeadersStep", savePreMergeHeadersStep, singleHeaderBufferSize)
.inBatches(headerRequestSize)
.thenProcessAsyncOrdered("downloadBodies", downloadBodiesStep, downloaderParallelism)
.thenProcessAsyncOrdered("downloadReceipts", downloadReceiptsStep, downloaderParallelism)
Expand Down Expand Up @@ -194,4 +199,10 @@ protected boolean shouldContinueDownloadingFromPeer(
}
return shouldContinue;
}

private long getPreMergeHeaderBlockNumber(final SyncState syncState) {
return syncConfig.isSavePreMergeHeadersOnlyEnabled()
? syncState.getCheckpoint().map(checkpoint -> checkpoint.blockNumber() - 1).orElse(0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially a breaking change if any private networks make use of the existing checkpoint config?

We're kind of overloading the checkpoint concept, which I think makes sense for public networks as long as we're explicit about it. We should check usage for private.
In full 4444s, with rolling window, I guess this static checkpoint goes away entirely for public nets?

: 0L;
}
}
Loading