Skip to content

Commit f39caf1

Browse files
authored
node: some improvements (#408)
1 parent a215c85 commit f39caf1

File tree

11 files changed

+437
-126
lines changed

11 files changed

+437
-126
lines changed

src/lean_spec/__main__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from lean_spec.subspecs.networking.enr import ENR
4242
from lean_spec.subspecs.networking.gossipsub import GossipTopic
4343
from lean_spec.subspecs.networking.reqresp.message import Status
44-
from lean_spec.subspecs.node import Node, NodeConfig, get_local_validator_id
44+
from lean_spec.subspecs.node import Node, NodeConfig
4545
from lean_spec.subspecs.ssz.hash import hash_tree_root
4646
from lean_spec.subspecs.sync.checkpoint_sync import (
4747
CheckpointSyncError,
@@ -281,7 +281,7 @@ async def _init_from_checkpoint(
281281
#
282282
# The store treats this as the new "genesis" for fork choice purposes.
283283
# All blocks before the checkpoint are effectively pruned.
284-
validator_id = get_local_validator_id(validator_registry)
284+
validator_id = validator_registry.primary_index() if validator_registry else None
285285
store = Store.get_forkchoice_store(state, anchor_block, validator_id)
286286
logger.info(
287287
"Initialized from checkpoint at slot %d (finalized=%s)",
@@ -487,7 +487,7 @@ async def run_node(
487487
block_topic = str(GossipTopic.block(GOSSIP_FORK_DIGEST))
488488
event_source.subscribe_gossip_topic(block_topic)
489489
# Subscribe to attestation subnet topics based on local validator id.
490-
validator_id = get_local_validator_id(validator_registry)
490+
validator_id = validator_registry.primary_index() if validator_registry else None
491491
if validator_id is None:
492492
subnet_id = 0
493493
logger.info("No local validator id; subscribing to attestation subnet %d", subnet_id)

src/lean_spec/subspecs/networking/client/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
Bridges connection events to NetworkService events.
1414
"""
1515

16-
from .event_source import LiveNetworkEventSource
16+
from .event_source import EventSource, LiveNetworkEventSource
1717
from .reqresp_client import ReqRespClient
1818

1919
__all__ = [
20+
"EventSource",
2021
"LiveNetworkEventSource",
2122
"ReqRespClient",
2223
]

src/lean_spec/subspecs/networking/client/event_source.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import asyncio
104104
import logging
105105
from dataclasses import dataclass, field
106+
from typing import Protocol, Self
106107

107108
from lean_spec.snappy import SnappyDecompressionError, frame_decompress
108109
from lean_spec.subspecs.containers import SignedBlockWithAttestation
@@ -162,6 +163,27 @@
162163
logger = logging.getLogger(__name__)
163164

164165

166+
class EventSource(Protocol):
167+
"""Protocol for network event sources.
168+
169+
Defines the minimal interface needed by NetworkService.
170+
LiveNetworkEventSource satisfies this with real network I/O.
171+
MockEventSource satisfies this for testing.
172+
"""
173+
174+
def __aiter__(self) -> Self:
175+
"""Return self as async iterator."""
176+
...
177+
178+
async def __anext__(self) -> NetworkEvent:
179+
"""Yield the next network event."""
180+
...
181+
182+
async def publish(self, topic: str, data: bytes) -> None:
183+
"""Broadcast a message to all peers on a topic."""
184+
...
185+
186+
165187
class GossipMessageError(Exception):
166188
"""Raised when a gossip message cannot be processed."""
167189

src/lean_spec/subspecs/networking/service/service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from lean_spec.snappy import frame_compress
3030
from lean_spec.subspecs.containers import SignedBlockWithAttestation
3131
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
32-
from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource
32+
from lean_spec.subspecs.networking.client.event_source import EventSource
3333
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic
3434
from lean_spec.subspecs.networking.peer import PeerInfo
3535
from lean_spec.subspecs.networking.types import ConnectionState
@@ -70,7 +70,7 @@ class NetworkService:
7070
sync_service: SyncService
7171
"""Sync service that receives routed events."""
7272

73-
event_source: LiveNetworkEventSource
73+
event_source: EventSource
7474
"""Source of network events from the transport layer."""
7575

7676
fork_digest: str = field(default="0x00000000")
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Node orchestrator for the Lean Ethereum consensus client."""
22

3-
from .node import Node, NodeConfig, get_local_validator_id
3+
from .node import Node, NodeConfig
44

5-
__all__ = ["Node", "NodeConfig", "get_local_validator_id"]
5+
__all__ = ["Node", "NodeConfig"]

src/lean_spec/subspecs/node/node.py

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919

2020
from lean_spec.subspecs.api import ApiServer, ApiServerConfig
2121
from lean_spec.subspecs.chain import SlotClock
22-
from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT
22+
from lean_spec.subspecs.chain.config import (
23+
ATTESTATION_COMMITTEE_COUNT,
24+
INTERVALS_PER_SLOT,
25+
SECONDS_PER_SLOT,
26+
)
2327
from lean_spec.subspecs.chain.service import ChainService
2428
from lean_spec.subspecs.containers import Block, BlockBody, SignedBlockWithAttestation, State
2529
from lean_spec.subspecs.containers.attestation import SignedAttestation
@@ -29,13 +33,16 @@
2933
from lean_spec.subspecs.containers.validator import ValidatorIndex
3034
from lean_spec.subspecs.forkchoice import Store
3135
from lean_spec.subspecs.networking import NetworkService
32-
from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource
36+
from lean_spec.subspecs.networking.client.event_source import EventSource
3337
from lean_spec.subspecs.ssz.hash import hash_tree_root
3438
from lean_spec.subspecs.storage import Database, SQLiteDatabase
3539
from lean_spec.subspecs.sync import BlockCache, NetworkRequester, PeerManager, SyncService
3640
from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService
3741
from lean_spec.types import Bytes32, Uint64
3842

43+
_ZERO_TIME = Uint64(0)
44+
"""Default genesis time for database loading when no genesis time is available."""
45+
3946

4047
@dataclass(frozen=True, slots=True)
4148
class NodeConfig:
@@ -51,7 +58,7 @@ class NodeConfig:
5158
validators: Validators
5259
"""Initial validator set for genesis state."""
5360

54-
event_source: LiveNetworkEventSource
61+
event_source: EventSource
5562
"""Source of network events."""
5663

5764
network: NetworkRequester
@@ -95,6 +102,11 @@ class NodeConfig:
95102
"""
96103
Whether this node functions as an aggregator.
97104
105+
Aggregator selection is static (node-level flag), not VRF-based rotation.
106+
The spec assumes at least one aggregator node exists in the network.
107+
108+
With ATTESTATION_COMMITTEE_COUNT = 1, all validators share subnet 0.
109+
98110
When True:
99111
- The node performs attestation aggregation operations
100112
- The ENR advertises aggregator capability to peers
@@ -104,20 +116,6 @@ class NodeConfig:
104116
"""
105117

106118

107-
def get_local_validator_id(registry: ValidatorRegistry | None) -> ValidatorIndex | None:
108-
"""
109-
Get the validator index for this node.
110-
111-
For now, returns None as a default for passive nodes or simple setups.
112-
Future implementations will look up keys in the registry.
113-
"""
114-
if registry is None or len(registry) == 0:
115-
return None
116-
117-
# For simplicity, use the first validator in the registry.
118-
return registry.indices()[0]
119-
120-
121119
@dataclass(slots=True)
122120
class Node:
123121
"""
@@ -148,6 +146,9 @@ class Node:
148146
validator_service: ValidatorService | None = field(default=None)
149147
"""Optional validator service for block/attestation production."""
150148

149+
database: Database | None = field(default=None)
150+
"""Optional database reference for lifecycle management."""
151+
151152
_shutdown: asyncio.Event = field(default_factory=asyncio.Event)
152153
"""Event signaling shutdown request."""
153154

@@ -170,13 +171,17 @@ def from_genesis(cls, config: NodeConfig) -> Node:
170171
# The database is optional - nodes can run without persistence.
171172
database: Database | None = None
172173
if config.database_path is not None:
173-
database = cls._create_database(config.database_path)
174+
database = SQLiteDatabase(config.database_path)
174175

175176
#
176177
# If database contains valid state, resume from there.
177178
# Otherwise, fall through to genesis initialization.
178-
validator_id = get_local_validator_id(config.validator_registry)
179-
store = cls._try_load_from_database(database, validator_id)
179+
validator_id = (
180+
config.validator_registry.primary_index() if config.validator_registry else None
181+
)
182+
store = cls._try_load_from_database(
183+
database, validator_id, config.genesis_time, config.time_fn
184+
)
180185

181186
if store is None:
182187
# Generate genesis state from validators.
@@ -242,7 +247,7 @@ def from_genesis(cls, config: NodeConfig) -> Node:
242247
#
243248
# SyncService delegates aggregate publishing to NetworkService
244249
# via a callback, avoiding a circular dependency.
245-
sync_service._publish_agg_fn = network_service.publish_aggregated_attestation
250+
sync_service.set_publish_agg_fn(network_service.publish_aggregated_attestation)
246251

247252
# Create API server if configured
248253
api_server: ApiServer | None = None
@@ -261,17 +266,20 @@ def from_genesis(cls, config: NodeConfig) -> Node:
261266
# Wire callbacks to publish produced blocks/attestations to the network.
262267
validator_service: ValidatorService | None = None
263268
if config.validator_registry is not None:
264-
# Create a wrapper for publish_attestation that computes the subnet_id
265-
# from the validator_id in the attestation
269+
# These wrappers serve a dual purpose:
270+
#
271+
# 1. Publish to the network so peers receive the block/attestation.
272+
# 2. Process locally so the node's own store reflects what it produced.
273+
#
274+
# Without local processing, the node would not see its own produced
275+
# blocks/attestations in forkchoice until they arrived back via gossip.
266276
async def publish_attestation_wrapper(attestation: SignedAttestation) -> None:
267277
subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)
268278
await network_service.publish_attestation(attestation, subnet_id)
269-
# Also route locally so we can aggregate our own attestation
270279
await sync_service.on_gossip_attestation(attestation)
271280

272281
async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None:
273282
await network_service.publish_block(block)
274-
# Also route locally so we update our own store
275283
await sync_service.on_gossip_block(block, peer_id=None)
276284

277285
validator_service = ValidatorService(
@@ -290,35 +298,32 @@ async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None:
290298
network_service=network_service,
291299
api_server=api_server,
292300
validator_service=validator_service,
301+
database=database,
293302
)
294303

295-
@staticmethod
296-
def _create_database(path: Path | str) -> Database:
297-
"""
298-
Create database instance from path.
299-
300-
Args:
301-
path: Path to SQLite database file.
302-
303-
Returns:
304-
Database instance ready for use.
305-
"""
306-
# SQLite handles its own caching at the filesystem level.
307-
return SQLiteDatabase(path)
308-
309304
@staticmethod
310305
def _try_load_from_database(
311306
database: Database | None,
312307
validator_id: ValidatorIndex | None,
308+
genesis_time: Uint64 | None = None,
309+
time_fn: Callable[[], float] = time.time,
313310
) -> Store | None:
314311
"""
315312
Try to load forkchoice store from existing database state.
316313
317314
Returns None if database is empty or unavailable.
318315
316+
Uses wall-clock time to set the store's time field. This ensures that
317+
after a restart, the store reflects actual elapsed time rather than just
318+
the head block's proposal moment. Without this, the store would reject
319+
valid attestations as "too far in future" until the chain service ticks
320+
catch up.
321+
319322
Args:
320323
database: Database to load from.
321324
validator_id: Validator index for the store instance.
325+
genesis_time: Unix timestamp of genesis (slot 0).
326+
time_fn: Wall-clock time source.
322327
323328
Returns:
324329
Loaded Store or None if no valid state exists.
@@ -345,12 +350,24 @@ def _try_load_from_database(
345350
if justified is None or finalized is None:
346351
return None
347352

353+
# Compute store time from wall clock to avoid post-restart drift.
354+
#
355+
# Using only the head block's slot would set the store time to the
356+
# block's proposal moment. After a restart, this makes the store
357+
# think it's in the past, rejecting valid attestations as "future".
358+
# Instead, derive time from wall clock, floored by the block's slot.
359+
gt = genesis_time if genesis_time is not None else _ZERO_TIME
360+
elapsed_seconds = Uint64(max(0, int(time_fn()) - int(gt)))
361+
wall_clock_intervals = elapsed_seconds * INTERVALS_PER_SLOT // SECONDS_PER_SLOT
362+
block_intervals = head_block.slot * INTERVALS_PER_SLOT
363+
store_time = max(wall_clock_intervals, block_intervals)
364+
348365
# Reconstruct minimal store from persisted data.
349366
#
350367
# The store starts with just the head block and state.
351368
# Additional blocks can be loaded on demand or via sync.
352369
return Store(
353-
time=Uint64(head_block.slot * INTERVALS_PER_SLOT),
370+
time=store_time,
354371
config=head_state.config,
355372
head=head_root,
356373
safe_target=head_root,
@@ -383,14 +400,19 @@ async def run(self, *, install_signal_handlers: bool = True) -> None:
383400
# A separate task monitors the shutdown signal.
384401
# When triggered, it stops all services.
385402
# Once services exit, execution completes.
386-
async with asyncio.TaskGroup() as tg:
387-
tg.create_task(self.chain_service.run())
388-
tg.create_task(self.network_service.run())
389-
if self.api_server is not None:
390-
tg.create_task(self.api_server.run())
391-
if self.validator_service is not None:
392-
tg.create_task(self.validator_service.run())
393-
tg.create_task(self._wait_shutdown())
403+
# The finally block ensures the database is closed on shutdown.
404+
try:
405+
async with asyncio.TaskGroup() as tg:
406+
tg.create_task(self.chain_service.run())
407+
tg.create_task(self.network_service.run())
408+
if self.api_server is not None:
409+
tg.create_task(self.api_server.run())
410+
if self.validator_service is not None:
411+
tg.create_task(self.validator_service.run())
412+
tg.create_task(self._wait_shutdown())
413+
finally:
414+
if self.database is not None:
415+
self.database.close()
394416

395417
def _install_signal_handlers(self) -> None:
396418
"""

src/lean_spec/subspecs/sync/service.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,17 @@ class SyncService:
198198
Same buffering strategy as individual attestations.
199199
"""
200200

201+
def set_publish_agg_fn(
202+
self, fn: Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]]
203+
) -> None:
204+
"""Wire the aggregated attestation publisher after construction.
205+
206+
Breaks circular dependency between SyncService and NetworkService.
207+
NetworkService needs SyncService at construction, but SyncService
208+
needs NetworkService's publish method. This setter resolves the cycle.
209+
"""
210+
self._publish_agg_fn = fn
211+
201212
def __post_init__(self) -> None:
202213
"""Initialize sync components."""
203214
self._init_components()

src/lean_spec/subspecs/validator/registry.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,21 @@ def indices(self) -> ValidatorIndices:
202202
"""
203203
return ValidatorIndices(data=list(self._validators.keys()))
204204

205+
def primary_index(self) -> ValidatorIndex | None:
206+
"""
207+
Get the primary validator index for store-level identity.
208+
209+
Returns the first validator index in the registry.
210+
With ATTESTATION_COMMITTEE_COUNT = 1, all validators share subnet 0,
211+
so a single ID suffices for store-level operations.
212+
213+
Returns:
214+
First validator index, or None if registry is empty.
215+
"""
216+
if not self._validators:
217+
return None
218+
return next(iter(self._validators))
219+
205220
def __len__(self) -> int:
206221
"""Number of validators in the registry."""
207222
return len(self._validators)

0 commit comments

Comments
 (0)