Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/src/main/java/samba/network/history/HistoryNetwork.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ public boolean store(Bytes contentKey, Bytes contentValue) {
return this.historyDB.saveContent(contentKey, contentValue);
}

private Optional<ContentBlockHeader> getAssociatedBlockHeader(Bytes contentKey) {
private Optional<ContentBlockHeader> getAssociatedBlockHeader(
Bytes contentKey) { // TODO: fallback mechanism in case found block header is not valid
try {
Bytes blockHeaderKeySsz =
Bytes.concatenate(Bytes.of(ContentType.BLOCK_HEADER.getByteValue()), contentKey.slice(1));
Expand Down Expand Up @@ -745,6 +746,11 @@ public Optional<FindContentResult> getContent(ContentKey contentKey, int timeout
CompletableFuture<Optional<FindContentResult>> future = task.execute();
try {
Optional<FindContentResult> result = future.join();
if (result.isPresent() && result.get().getContent() != null)
this.gossip(
task.getInterestedNodes(),
contentKey.getSszBytes(),
Bytes.fromHexString(result.get().getContent())); // POKE Mechanism
return result;
} catch (Exception e) {
LOG.error("Error when executing getContent", e);
Expand All @@ -766,6 +772,11 @@ public Optional<TraceGetContentResult> traceGetContent(
CompletableFuture<Optional<TraceGetContentResult>> future = task.execute();
try {
Optional<TraceGetContentResult> result = future.join();
if (result.isPresent() && result.get().getContent() != null)
this.gossip(
task.getInterestedNodes(),
contentKey.getSszBytes(),
Bytes.fromHexString(result.get().getContent())); // POKE Mechanism
return result;
} catch (Exception e) {
LOG.error("Error when executing traceGetContent", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import java.util.stream.Collectors;

import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt256;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.hyperledger.besu.crypto.Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +35,7 @@ public class RecursiveLookupTaskFindContent {
private final Set<NodeRecord> foundNodes = new HashSet<>();
private Optional<FindContentResult> content = Optional.empty();
private final int timeout;
private final Set<NodeRecord> interestedNodes = new HashSet<>();

public RecursiveLookupTaskFindContent(
final HistoryNetwork historyNetwork,
Expand Down Expand Up @@ -108,6 +111,10 @@ private void queryPeer(final NodeRecord peer) {
.flatMap(Optional::stream)
.filter(node -> !queriedNodeIds.contains(node.getNodeId()))
.collect(Collectors.toSet()));
UInt256 peerDistance =
UInt256.fromBytes(peer.getNodeId().xor(Hash.sha256(contentKey)));
if (peerDistance.lessOrEqualThan(historyNetwork.getRadiusFromNode(peer)))
interestedNodes.add(peer);
}
sendRequests();
}
Expand All @@ -122,4 +129,8 @@ private void queryPeer(final NodeRecord peer) {
return null;
});
}

public Set<NodeRecord> getInterestedNodes() {
return interestedNodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private synchronized void sendRequests() {

final boolean closestCondition =
foundNodes.stream()
.limit(MAX_NODE_LIST_COUNT)
.filter(record -> !excludedNodes.contains(record))
.limit(MAX_NODE_LIST_COUNT)
.filter(
record ->
UInt256.fromBytes(record.getNodeId().xor(targetNodeId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public class RecursiveLookupTaskTraceFindContent {
private Optional<TraceGetContentResult> content = Optional.empty();
private final int timeout;

private final UInt256 origin; // Done
private final UInt256 targetId; // Done
private UInt256 receivedFrom; // Done
private final long startedAtMs; // Done
private final Map<UInt256, TraceResultResponseItemJson> responses = new HashMap<>(); // Done
private final UInt256 origin;
private final UInt256 targetId;
private UInt256 receivedFrom;
private final long startedAtMs;
private final Map<UInt256, TraceResultResponseItemJson> responses = new HashMap<>();
private final Map<UInt256, TraceResultMetadataObjectJson> metadata = new HashMap<>();
private final List<UInt256> cancelled = new ArrayList<>(); // Done
private final List<UInt256> cancelled = new ArrayList<>();
private final Set<NodeRecord> interestedNodes = new HashSet<>();

public RecursiveLookupTaskTraceFindContent(
final HistoryNetwork historyNetwork,
Expand Down Expand Up @@ -161,6 +162,10 @@ private void queryPeer(final NodeRecord peer) {
.flatMap(Optional::stream)
.filter(node -> !queriedNodeIds.contains(node.getNodeId()))
.collect(Collectors.toSet()));
UInt256 peerDistance =
UInt256.fromBytes(peer.getNodeId().xor(Hash.sha256(contentKey)));
if (peerDistance.lessOrEqualThan(historyNetwork.getRadiusFromNode(peer)))
interestedNodes.add(peer);
}
responses.put(
peerNodeId, new TraceResultResponseItemJson(durationMs, respondedWith));
Expand All @@ -177,4 +182,8 @@ private void queryPeer(final NodeRecord peer) {
return null;
});
}

public Set<NodeRecord> getInterestedNodes() {
return interestedNodes;
}
}
Loading