Skip to content

Commit c79051d

Browse files
authored
Body Validation Fallback (#143)
1 parent 8e4463c commit c79051d

2 files changed

Lines changed: 63 additions & 13 deletions

File tree

core/src/main/java/samba/network/history/HistoryNetwork.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.util.ArrayList;
4545
import java.util.Arrays;
46+
import java.util.HashSet;
4647
import java.util.List;
4748
import java.util.Objects;
4849
import java.util.Optional;
@@ -449,17 +450,44 @@ private Optional<ContentBlockHeader> getAssociatedBlockHeader(
449450
if (blockHeaderBytes.isPresent()) {
450451
return ContentUtil.createBlockHeaderfromSszBytes(blockHeaderBytes.get());
451452
}
452-
453-
Optional<FindContentResult> searchResult = getContent(blockHeaderKey, DEFAULT_TIMEOUT);
454-
if (searchResult.isPresent()) {
455-
Optional<ContentBlockHeader> blockHeader =
456-
ContentUtil.createBlockHeaderfromSszBytes(
457-
Bytes.fromHexString(searchResult.get().getContent()));
458-
this.store(
459-
blockHeaderKeySsz, blockHeader.get().getSszBytes()); // Store newly located block header
460-
return blockHeader;
453+
int SEARCH_ATTEMPTS = 3;
454+
Set<NodeRecord> excludedNodes = new HashSet<>();
455+
for (int i = 0; i < SEARCH_ATTEMPTS; i++) {
456+
RecursiveLookupTaskFindContent task =
457+
new RecursiveLookupTaskFindContent(
458+
this,
459+
blockHeaderKey.getSszBytes(),
460+
this.discv5Client.getHomeNodeRecord().getNodeId(),
461+
getFoundNodes(blockHeaderKey),
462+
DEFAULT_TIMEOUT);
463+
Optional<FindContentResult> searchResult = task.execute().join();
464+
465+
if (searchResult.isPresent()) {
466+
Optional<ContentBlockHeader> blockHeader =
467+
ContentUtil.createBlockHeaderfromSszBytes(
468+
Bytes.fromHexString(searchResult.get().getContent()));
469+
if (this.store(blockHeaderKeySsz, blockHeader.get().getSszBytes())) {
470+
LOG.debug(
471+
"Found and stored block header for content key {}: {}",
472+
contentKey,
473+
blockHeaderKey.getSszBytes().toHexString());
474+
this.gossip(
475+
task.getInterestedNodes(),
476+
blockHeaderKey.getSszBytes(),
477+
Bytes.fromHexString(searchResult.get().getContent())); // POKE Mechanism
478+
return blockHeader; // Store newly located block header
479+
}
480+
LOG.debug(
481+
"Found Header for content key {} invalid, trying other peers",
482+
contentKey,
483+
blockHeaderKey);
484+
}
485+
if (task.getRespondingNode().isPresent()) excludedNodes.add(task.getRespondingNode().get());
461486
}
462-
487+
LOG.debug(
488+
"Could not find associated block header for content key {} after {} attempts",
489+
contentKey,
490+
SEARCH_ATTEMPTS);
463491
return Optional.empty();
464492
} catch (Exception e) {
465493
LOG.debug("Error when retrieving associated block header for {}", contentKey, e);

core/src/main/java/samba/services/search/RecursiveLookupTaskFindContent.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import samba.domain.messages.requests.FindContent;
99
import samba.network.history.HistoryNetwork;
1010

11+
import java.util.Collections;
1112
import java.util.HashSet;
1213
import java.util.List;
1314
import java.util.Optional;
@@ -36,18 +37,31 @@ public class RecursiveLookupTaskFindContent {
3637
private Optional<FindContentResult> content = Optional.empty();
3738
private final int timeout;
3839
private final Set<NodeRecord> interestedNodes = new HashSet<>();
40+
private final Set<NodeRecord> excludedNodes = new HashSet<>();
41+
private Optional<NodeRecord> respondingNode;
3942

4043
public RecursiveLookupTaskFindContent(
4144
final HistoryNetwork historyNetwork,
4245
final Bytes contentKey,
4346
final Bytes homeNodeId,
4447
final Set<NodeRecord> foundNodes,
4548
final int timeout) {
49+
this(historyNetwork, contentKey, homeNodeId, foundNodes, Set.of(), timeout);
50+
}
51+
52+
public RecursiveLookupTaskFindContent(
53+
final HistoryNetwork historyNetwork,
54+
final Bytes contentKey,
55+
final Bytes homeNodeId,
56+
final Set<NodeRecord> foundNodes,
57+
final Set<NodeRecord> excludedNodes,
58+
final int timeout) {
4659
this.historyNetwork = historyNetwork;
4760
this.contentKey = contentKey;
4861
this.queriedNodeIds.add(homeNodeId);
4962
this.foundNodes.addAll(foundNodes);
5063
this.timeout = timeout;
64+
this.excludedNodes.addAll(excludedNodes);
5165
}
5266

5367
public CompletableFuture<Optional<FindContentResult>> execute() {
@@ -61,6 +75,7 @@ private synchronized void sendRequests() {
6175
return;
6276
}
6377
if (content.isPresent()) {
78+
LOG.error("No nodes left to query");
6479
future.complete(content);
6580
return;
6681
}
@@ -72,7 +87,7 @@ private synchronized void sendRequests() {
7287
.collect(Collectors.toList());
7388

7489
if (nodesToQuery.isEmpty()) {
75-
future.completeExceptionally(new RuntimeException("No nodes left to query."));
90+
future.complete(content);
7691
return;
7792
}
7893

@@ -99,14 +114,17 @@ private void queryPeer(final NodeRecord peer) {
99114
LOG.debug("Node {} returned empty result", peer.getNodeId());
100115
} else {
101116
FindContentResult contentResult = result.get();
102-
if (contentResult.getContent() != null) {
117+
if (contentResult.getContent() != null && !excludedNodes.contains(peer)) {
118+
this.respondingNode = Optional.of(peer);
103119
content = Optional.of(contentResult);
104120
future.complete(content);
105121
return;
106122
}
107123
// Add nodes to foundNodes and continue searching
108124
foundNodes.addAll(
109-
contentResult.getEnrs().stream()
125+
Optional.ofNullable(contentResult.getEnrs())
126+
.orElseGet(Collections::emptyList)
127+
.stream()
110128
.map(historyNetwork::nodeRecordFromEnr)
111129
.flatMap(Optional::stream)
112130
.filter(node -> !queriedNodeIds.contains(node.getNodeId()))
@@ -133,4 +151,8 @@ private void queryPeer(final NodeRecord peer) {
133151
public Set<NodeRecord> getInterestedNodes() {
134152
return interestedNodes;
135153
}
154+
155+
public Optional<NodeRecord> getRespondingNode() {
156+
return respondingNode;
157+
}
136158
}

0 commit comments

Comments
 (0)