Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
* @param <Tuple_>
*/
@NullMarked
public final class RecordAndReplayPropagator<Tuple_ extends AbstractTuple>
implements Propagator {
public final class RecordAndReplayPropagator<Tuple_ extends AbstractTuple> implements Propagator {

private final Set<Object> retractQueue;
private final Set<Object> insertQueue;
Expand All @@ -42,32 +41,29 @@ public final class RecordAndReplayPropagator<Tuple_ extends AbstractTuple>
private final Supplier<BavetPrecomputeBuildHelper<Tuple_>> precomputeBuildHelperSupplier;
private final UnaryOperator<Tuple_> internalTupleToOutputTupleMapper;
private final Map<Object, List<Tuple_>> objectToOutputTuplesMap;
private final Map<Class<?>, Boolean> objectClassToIsEntitySourceClass;
private final Set<Object> alreadyUpdatingSet = Collections.newSetFromMap(new IdentityHashMap<>());
private final Map<Class<?>, Boolean> objectClassToIsEntitySourceClassMap;

private final StaticPropagationQueue<Tuple_> propagationQueue;

public RecordAndReplayPropagator(
Supplier<BavetPrecomputeBuildHelper<Tuple_>> precomputeBuildHelperSupplier,
UnaryOperator<Tuple_> internalTupleToOutputTupleMapper,
TupleLifecycle<Tuple_> nextNodesTupleLifecycle, int size) {
public RecordAndReplayPropagator(Supplier<BavetPrecomputeBuildHelper<Tuple_>> precomputeBuildHelperSupplier,
UnaryOperator<Tuple_> internalTupleToOutputTupleMapper, TupleLifecycle<Tuple_> nextNodesTupleLifecycle, int size) {
this.precomputeBuildHelperSupplier = precomputeBuildHelperSupplier;
this.internalTupleToOutputTupleMapper = internalTupleToOutputTupleMapper;
this.objectToOutputTuplesMap = CollectionUtils.newIdentityHashMap(size);

// Guesstimate that updates are dominant.
this.retractQueue = CollectionUtils.newIdentityHashSet(size / 20);
this.insertQueue = CollectionUtils.newIdentityHashSet(size / 20);
this.objectClassToIsEntitySourceClass = new HashMap<>();
this.objectClassToIsEntitySourceClassMap = new HashMap<>();
this.seenEntitySet = CollectionUtils.newIdentityHashSet(size);
this.seenFactSet = CollectionUtils.newIdentityHashSet(size);

this.propagationQueue = new StaticPropagationQueue<>(nextNodesTupleLifecycle);
}

public RecordAndReplayPropagator(
Supplier<BavetPrecomputeBuildHelper<Tuple_>> precomputeBuildHelperSupplier,
UnaryOperator<Tuple_> internalTupleToOutputTupleMapper,
TupleLifecycle<Tuple_> nextNodesTupleLifecycle) {
public RecordAndReplayPropagator(Supplier<BavetPrecomputeBuildHelper<Tuple_>> precomputeBuildHelperSupplier,
UnaryOperator<Tuple_> internalTupleToOutputTupleMapper, TupleLifecycle<Tuple_> nextNodesTupleLifecycle) {
this(precomputeBuildHelperSupplier, internalTupleToOutputTupleMapper, nextNodesTupleLifecycle, 1000);
}

Expand All @@ -77,14 +73,18 @@ public void insert(Object object) {
}

public void update(Object object) {
if (!alreadyUpdatingSet.add(object)) {
// The list was already sent to the propagation queue.
// Don't iterate over it again, even though the queue would deduplicate its contents.
return;
}
// Updates happen very frequently, so we optimize by avoiding the update queue
// and going straight to the propagation queue.
// The propagation queue deduplicates updates internally.
var outTupleList = objectToOutputTuplesMap.get(object);
if (outTupleList == null) {
return;
if (outTupleList != null) {
outTupleList.forEach(propagationQueue::update);
}
outTupleList.forEach(propagationQueue::update);
}

public void retract(Object object) {
Expand Down Expand Up @@ -122,7 +122,7 @@ public void propagateRetracts() {
// Do not remove queued retracts from inserts; if a fact property
// change, there will be both a retract and insert for that fact
for (var object : insertQueue) {
if (objectClassToIsEntitySourceClass.computeIfAbsent(object.getClass(),
if (objectClassToIsEntitySourceClassMap.computeIfAbsent(object.getClass(),
precomputeBuildHelper::isSourceEntityClass)) {
seenEntitySet.add(object);
} else {
Expand Down Expand Up @@ -158,6 +158,7 @@ private static <A> List<BavetRootNode<A>> getRootNodes(Object object, NodeNetwor
@Override
public void propagateUpdates() {
propagationQueue.propagateUpdates();
alreadyUpdatingSet.clear();
}

@Override
Expand Down Expand Up @@ -192,8 +193,7 @@ private void invalidateCache() {
objectToOutputTuplesMap.clear();
}

private void recalculateTuples(NodeNetwork internalNodeNetwork,
Map<Class<?>, List<BavetRootNode<?>>> classToRootNodeList,
private void recalculateTuples(NodeNetwork internalNodeNetwork, Map<Class<?>, List<BavetRootNode<?>>> classToRootNodeList,
RecordingTupleLifecycle<Tuple_> recordingTupleLifecycle) {
var internalTupleToOutputTupleMap = new IdentityHashMap<Tuple_, Tuple_>(seenEntitySet.size());
for (var invalidated : seenEntitySet) {
Expand All @@ -207,12 +207,12 @@ private void recalculateTuples(NodeNetwork internalNodeNetwork,
internalNodeNetwork.settle();
}
if (mappedTuples.isEmpty()) {
objectToOutputTuplesMap.put(invalidated, Collections.emptyList());
objectToOutputTuplesMap.remove(invalidated);
} else {
objectToOutputTuplesMap.put(invalidated, mappedTuples);
}
}
objectToOutputTuplesMap.values().stream().flatMap(List::stream).forEach(this::insertIfAbsent);
}

}
}
Loading