Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/src/const.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ const String ERROR__ERROR_DURING_PROCESSING_SEND = 'WR:3006 - One of the listene

const String ERROR__CANT_PUT_ALREADY_EXISTING_INSTANCE = 'WR:4001 - Cant put already existing instance (unlock first)';
const String ERROR__CANT_FIND_INSTANCE_NULL = 'WR:4002 - Cant find instance its not set';
const String ERROR__DATA_TYPE_MISMATCH = 'WR:5001 - WireData type mismatch';
81 changes: 34 additions & 47 deletions lib/src/layers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,20 @@ class WireCommunicateLayer {
}

Future<dynamic> _transferOnWire(int wireId, [payload, scope]) async {
if (!_wireById.containsKey(wireId)) return null;
final wire = _wireById[wireId]!;
final isLookingInScope = scope != null;
if (isLookingInScope && wire.scope != scope) return null;
final result = await wire.transfer(payload).catchError(_processSendError);
if (wire.withReplies && --wire.replies == 0) {
await _removeWire(wire);

if (wire.withReplies) {
if (wire.replies == 1) {
await _removeWire(wire);
} else {
wire.replies--;
}
}
return result;

return await wire.transfer(payload).catchError(_processSendError);
}

WireSendError _processSendError(err) => WireSendError(ERROR__ERROR_DURING_PROCESSING_SEND, err as Exception);
Expand All @@ -70,62 +76,50 @@ class WireCommunicateLayer {
final withScope = scope != null;
final withListener = listener != null;
final toRemoveList = <Wire<dynamic>>[];
final hasWires = _wireIdsBySignal.containsKey(signal);
if (hasWires) {
for (final wireId in _wireIdsBySignal[signal]!) {
if (_wireById.containsKey(wireId)) {
final wire = _wireById[wireId]!;
final isWrongScope = withScope && scope != wire.scope;
final isWrongListener = withListener && !wire.listenerEqual(listener!);
if (isWrongScope || isWrongListener) continue;
toRemoveList.add(wire);
}
await Future.forEach(_wireIdsBySignal[signal]!, (wireId) {
if (_wireById.containsKey(wireId)) {
final wire = _wireById[wireId]!;
final isWrongScope = withScope && scope != wire.scope;
final isWrongListener = withListener && !wire.listenerEqual(listener);
if (isWrongScope || isWrongListener) return;
toRemoveList.add(wire);
}
}
if (toRemoveList.isNotEmpty) {
for (final wire in toRemoveList) {
await _removeWire(wire);
}
}
});
await Future.forEach(toRemoveList, (Wire<dynamic> wireToRemove) => _removeWire(wireToRemove));
}
return exists;
}

Future<void> clear() async {
final wireToRemove = <Wire<dynamic>>[];
_wireById.forEach((_, wire) => wireToRemove.add(wire));
if (wireToRemove.isNotEmpty) {
for (final wire in wireToRemove) {
await _removeWire(wire);
}
final wiresToRemove = <Wire<dynamic>>[];
_wireById.forEach((hash, wire) => wiresToRemove.add(wire));
if (wiresToRemove.isNotEmpty) {
await Future.forEach(wiresToRemove, (Wire<dynamic> wire) => _removeWire(wire));
}

_wireById.clear();
_wireIdsBySignal.clear();
}

List<Wire<dynamic>> getBySignal(String signal) {
if (hasSignal(signal)) {
return _wireIdsBySignal[signal]!.map((wireId) {
return _wireById[wireId];
}).whereType<Wire<dynamic>>().toList();
}
return [];
return hasSignal(signal) ? _wireIdsBySignal[signal]!.map((wid) => _wireById[wid]!).toList() : <Wire<dynamic>>[];
}

List<Wire<dynamic>> getByScope(Object scope) {
final result = <Wire<dynamic>>[];
_wireById.forEach((_, wire) {
if (wire.scope == scope) result.add(wire);
});
_wireById.forEach((_, wire) => {if (wire.scope == scope) result.add(wire)});
return result;
}

List<Wire<dynamic>> getByListener(WireListener<dynamic> listener) {
final result = <Wire<dynamic>>[];
// print('> Wire -> WireCommunicateLayer: getByListener, listener = ${listener}');
_wireById.forEach((_, wire) {
if (wire.listenerEqual(listener)) result.add(wire);
final compareListener = wire.listenerEqual(listener);
// print('\t compareListener = ${compareListener}');
if (compareListener) result.add(wire);
});
// print('> \t result = ${result}');
return result;
}

Expand Down Expand Up @@ -153,7 +147,7 @@ class WireCommunicateLayer {
final noMoreSignals = wireIdsForSignal.isEmpty;
if (noMoreSignals) _wireIdsBySignal.remove(signal);

wire.clear();
await wire.clear();

return noMoreSignals;
}
Expand All @@ -170,12 +164,8 @@ class WireMiddlewaresLayer {
return _process((WireMiddleware m) => m.onData(key, prevValue, nextValue));
}

Future<void> onDataError(dynamic error, String key, dynamic value) async {
return _process((WireMiddleware m) => m.onDataError(error, key, value));
}

Future<void> onReset(String key, dynamic prevValue) async {
return _process((WireMiddleware m) => m.onReset(key, prevValue));
return _process((WireMiddleware m) => m.onData(key, prevValue, null));
}

Future<void> onRemove(String signal, {Object? scope, WireListener<dynamic>? listener}) async {
Expand Down Expand Up @@ -213,11 +203,8 @@ class WireDataContainerLayer {
Future<void> clear() async {
final wireDataToRemove = <WireData>[];
_dataMap.forEach((key, wireData) => wireDataToRemove.add(wireData));
if (wireDataToRemove.isNotEmpty) {
for (final wireData in wireDataToRemove) {
await wireData.remove(clean: true);
}
}
await Future.forEach(wireDataToRemove, (WireData wireData) async => wireData.remove(clean: true));

_dataMap.clear();
}
}
28 changes: 11 additions & 17 deletions lib/src/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class Wire<T> {
}

/// Nullify all relations
void clear() {
Future<void> clear() async {
_scope = null;
_listener = null;
}
Expand Down Expand Up @@ -195,13 +195,13 @@ class Wire<T> {
}

static Future<List<bool>> _removeAllByScope<T>(Object scope, {WireListener<dynamic>? listener}) async {
final listOfWiresForScope = List.from(_COMMUNICATION_LAYER.getByScope(scope));
final List<Wire<dynamic>> listOfWiresForScope = List.from(_COMMUNICATION_LAYER.getByScope(scope));
return Future.wait(listOfWiresForScope
.map((Wire<dynamic> wire) async => _removeAllBySignal(wire.signal, scope: scope, listener: listener)));
}

static Future<List<bool>> _removeAllByListener(WireListener<dynamic> listener) async {
final listOfWiresWithListener = List.from(_COMMUNICATION_LAYER.getByListener(listener));
final List<Wire<dynamic>> listOfWiresWithListener = List.from(_COMMUNICATION_LAYER.getByListener(listener));
return Future.wait(listOfWiresWithListener
.map((Wire<dynamic> wire) async => _removeAllBySignal(wire.signal, scope: wire.scope, listener: listener)));
}
Expand All @@ -218,18 +218,12 @@ class Wire<T> {
/// Returns [List<Wire>]
static List<Wire<dynamic>> get<T>({String? signal, Object? scope, WireListener<dynamic>? listener, int? wireId}) {
final result = <Wire<dynamic>>[];
if (signal != null) {
result.addAll(_COMMUNICATION_LAYER.getBySignal(signal));
}
if (scope != null) {
result.addAll(_COMMUNICATION_LAYER.getByScope(scope));
}
if (listener != null) {
result.addAll(_COMMUNICATION_LAYER.getByListener(listener));
}
if (signal != null) result.addAll(_COMMUNICATION_LAYER.getBySignal(signal));
if (scope != null) result.addAll(_COMMUNICATION_LAYER.getByScope(scope));
if (listener != null) result.addAll(_COMMUNICATION_LAYER.getByListener(listener));
if (wireId != null) {
final instance = _COMMUNICATION_LAYER.getByWireId(wireId);
if (instance != null) result.add(instance);
final wire = _COMMUNICATION_LAYER.getByWireId(wireId);
if (wire != null) result.add(wire);
}
return result;
}
Expand Down Expand Up @@ -287,9 +281,9 @@ class Wire<T> {
}

/// Return an instance of an object by its type, throw an error in case it is not set
static T find<T>() {
final key = T.toString();
static dynamic find<R>([R? instanceType]) {
final key = (instanceType ?? R).toString();
if (Wire.data(key).isSet == false) throw AssertionError(ERROR__CANT_FIND_INSTANCE_NULL);
return Wire.data(key).value! as T;
return Wire.data(key).value!;
}
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: 1.5.4
homepage: https://github.com/WiresWare/wire_dart

environment:
sdk: '>=3.10.1 <4.0.0'
sdk: '>=2.17.5 <3.0.0'

dev_dependencies:
test: ^1.16.5
Expand Down
17 changes: 17 additions & 0 deletions test/wire_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ void main() {
final key = 'generic_key';
Wire.data<int>(key, value: 10);
expect(Wire.data<int>(key).value, 10);
expect(() => Wire.data<String>(key), throwsA(isA<Exception>()));
});

test('3.3 onError callback', () async {
Expand Down Expand Up @@ -551,6 +552,22 @@ void main() {
print('>\t WireSendResults.list.length = ${results.list.length}');
expect(results.list.length, 1);
});

test('6.2 Race condition with replies', () async {
const signal = 'race_condition_signal';
var callCount = 0;
await Wire.add(
Object(),
signal,
(_, __) async {
callCount++;
await Wire.send(signal);
},
replies: 1,
);
await Wire.send(signal);
expect(callCount, 1);
});
});
}

Expand Down