diff --git a/lib/src/const.dart b/lib/src/const.dart index 5cb5719e..4f432df1 100644 --- a/lib/src/const.dart +++ b/lib/src/const.dart @@ -17,4 +17,3 @@ const String ERROR__ERROR_DURING_PROCESSING_SEND = 'WR:3006 - One of the listene const String ERROR__CANT_PUT_ALREADY_EXISTING_INSTANCE = 'WR:4001 - Cant put already existing instance (unlock first)'; const String ERROR__CANT_FIND_INSTANCE_NULL = 'WR:4002 - Cant find instance its not set'; -const String ERROR__DATA_TYPE_MISMATCH = 'WR:5001 - WireData type mismatch'; diff --git a/lib/src/layers.dart b/lib/src/layers.dart index 5503e35f..4236c6af 100644 --- a/lib/src/layers.dart +++ b/lib/src/layers.dart @@ -52,14 +52,20 @@ class WireCommunicateLayer { } Future _transferOnWire(int wireId, [payload, scope]) async { + if (!_wireById.containsKey(wireId)) return null; final wire = _wireById[wireId]!; final isLookingInScope = scope != null; if (isLookingInScope && wire.scope != scope) return null; - final result = await wire.transfer(payload).catchError(_processSendError); - if (wire.withReplies && --wire.replies == 0) { - await _removeWire(wire); + + if (wire.withReplies) { + if (wire.replies == 1) { + await _removeWire(wire); + } else { + wire.replies--; + } } - return result; + + return await wire.transfer(payload).catchError(_processSendError); } WireSendError _processSendError(err) => WireSendError(ERROR__ERROR_DURING_PROCESSING_SEND, err as Exception); @@ -70,53 +76,38 @@ class WireCommunicateLayer { final withScope = scope != null; final withListener = listener != null; final toRemoveList = >[]; - final hasWires = _wireIdsBySignal.containsKey(signal); - if (hasWires) { - for (final wireId in _wireIdsBySignal[signal]!) { - if (_wireById.containsKey(wireId)) { - final wire = _wireById[wireId]!; - final isWrongScope = withScope && scope != wire.scope; - final isWrongListener = withListener && !wire.listenerEqual(listener!); - if (isWrongScope || isWrongListener) continue; - toRemoveList.add(wire); - } + await Future.forEach(_wireIdsBySignal[signal]!, (wireId) { + if (_wireById.containsKey(wireId)) { + final wire = _wireById[wireId]!; + final isWrongScope = withScope && scope != wire.scope; + final isWrongListener = withListener && !wire.listenerEqual(listener); + if (isWrongScope || isWrongListener) return; + toRemoveList.add(wire); } - } - if (toRemoveList.isNotEmpty) { - for (final wire in toRemoveList) { - await _removeWire(wire); - } - } + }); + await Future.forEach(toRemoveList, (Wire wireToRemove) => _removeWire(wireToRemove)); } return exists; } Future clear() async { - final wireToRemove = >[]; - _wireById.forEach((_, wire) => wireToRemove.add(wire)); - if (wireToRemove.isNotEmpty) { - for (final wire in wireToRemove) { - await _removeWire(wire); - } + final wiresToRemove = >[]; + _wireById.forEach((hash, wire) => wiresToRemove.add(wire)); + if (wiresToRemove.isNotEmpty) { + await Future.forEach(wiresToRemove, (Wire wire) => _removeWire(wire)); } + _wireById.clear(); _wireIdsBySignal.clear(); } List> getBySignal(String signal) { - if (hasSignal(signal)) { - return _wireIdsBySignal[signal]!.map((wireId) { - return _wireById[wireId]; - }).whereType>().toList(); - } - return []; + return hasSignal(signal) ? _wireIdsBySignal[signal]!.map((wid) => _wireById[wid]!).toList() : >[]; } List> getByScope(Object scope) { final result = >[]; - _wireById.forEach((_, wire) { - if (wire.scope == scope) result.add(wire); - }); + _wireById.forEach((_, wire) => {if (wire.scope == scope) result.add(wire)}); return result; } @@ -124,8 +115,11 @@ class WireCommunicateLayer { final result = >[]; // print('> Wire -> WireCommunicateLayer: getByListener, listener = ${listener}'); _wireById.forEach((_, wire) { - if (wire.listenerEqual(listener)) result.add(wire); + final compareListener = wire.listenerEqual(listener); + // print('\t compareListener = ${compareListener}'); + if (compareListener) result.add(wire); }); + // print('> \t result = ${result}'); return result; } @@ -153,7 +147,7 @@ class WireCommunicateLayer { final noMoreSignals = wireIdsForSignal.isEmpty; if (noMoreSignals) _wireIdsBySignal.remove(signal); - wire.clear(); + await wire.clear(); return noMoreSignals; } @@ -170,12 +164,8 @@ class WireMiddlewaresLayer { return _process((WireMiddleware m) => m.onData(key, prevValue, nextValue)); } - Future onDataError(dynamic error, String key, dynamic value) async { - return _process((WireMiddleware m) => m.onDataError(error, key, value)); - } - Future onReset(String key, dynamic prevValue) async { - return _process((WireMiddleware m) => m.onReset(key, prevValue)); + return _process((WireMiddleware m) => m.onData(key, prevValue, null)); } Future onRemove(String signal, {Object? scope, WireListener? listener}) async { @@ -213,11 +203,8 @@ class WireDataContainerLayer { Future clear() async { final wireDataToRemove = []; _dataMap.forEach((key, wireData) => wireDataToRemove.add(wireData)); - if (wireDataToRemove.isNotEmpty) { - for (final wireData in wireDataToRemove) { - await wireData.remove(clean: true); - } - } + await Future.forEach(wireDataToRemove, (WireData wireData) async => wireData.remove(clean: true)); + _dataMap.clear(); } } diff --git a/lib/src/main.dart b/lib/src/main.dart index 6e632007..454e8dea 100644 --- a/lib/src/main.dart +++ b/lib/src/main.dart @@ -114,7 +114,7 @@ class Wire { } /// Nullify all relations - void clear() { + Future clear() async { _scope = null; _listener = null; } @@ -195,13 +195,13 @@ class Wire { } static Future> _removeAllByScope(Object scope, {WireListener? listener}) async { - final listOfWiresForScope = List.from(_COMMUNICATION_LAYER.getByScope(scope)); + final List> listOfWiresForScope = List.from(_COMMUNICATION_LAYER.getByScope(scope)); return Future.wait(listOfWiresForScope .map((Wire wire) async => _removeAllBySignal(wire.signal, scope: scope, listener: listener))); } static Future> _removeAllByListener(WireListener listener) async { - final listOfWiresWithListener = List.from(_COMMUNICATION_LAYER.getByListener(listener)); + final List> listOfWiresWithListener = List.from(_COMMUNICATION_LAYER.getByListener(listener)); return Future.wait(listOfWiresWithListener .map((Wire wire) async => _removeAllBySignal(wire.signal, scope: wire.scope, listener: listener))); } @@ -218,18 +218,12 @@ class Wire { /// Returns [List] static List> get({String? signal, Object? scope, WireListener? listener, int? wireId}) { final result = >[]; - if (signal != null) { - result.addAll(_COMMUNICATION_LAYER.getBySignal(signal)); - } - if (scope != null) { - result.addAll(_COMMUNICATION_LAYER.getByScope(scope)); - } - if (listener != null) { - result.addAll(_COMMUNICATION_LAYER.getByListener(listener)); - } + if (signal != null) result.addAll(_COMMUNICATION_LAYER.getBySignal(signal)); + if (scope != null) result.addAll(_COMMUNICATION_LAYER.getByScope(scope)); + if (listener != null) result.addAll(_COMMUNICATION_LAYER.getByListener(listener)); if (wireId != null) { - final instance = _COMMUNICATION_LAYER.getByWireId(wireId); - if (instance != null) result.add(instance); + final wire = _COMMUNICATION_LAYER.getByWireId(wireId); + if (wire != null) result.add(wire); } return result; } @@ -287,9 +281,9 @@ class Wire { } /// Return an instance of an object by its type, throw an error in case it is not set - static T find() { - final key = T.toString(); + static dynamic find([R? instanceType]) { + final key = (instanceType ?? R).toString(); if (Wire.data(key).isSet == false) throw AssertionError(ERROR__CANT_FIND_INSTANCE_NULL); - return Wire.data(key).value! as T; + return Wire.data(key).value!; } } diff --git a/pubspec.yaml b/pubspec.yaml index e779ff09..247f83c0 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -4,7 +4,7 @@ version: 1.5.4 homepage: https://github.com/WiresWare/wire_dart environment: - sdk: '>=3.10.1 <4.0.0' + sdk: '>=2.17.5 <3.0.0' dev_dependencies: test: ^1.16.5 diff --git a/test/wire_test.dart b/test/wire_test.dart index 0b539be1..66a9a88c 100644 --- a/test/wire_test.dart +++ b/test/wire_test.dart @@ -371,6 +371,7 @@ void main() { final key = 'generic_key'; Wire.data(key, value: 10); expect(Wire.data(key).value, 10); + expect(() => Wire.data(key), throwsA(isA())); }); test('3.3 onError callback', () async { @@ -551,6 +552,22 @@ void main() { print('>\t WireSendResults.list.length = ${results.list.length}'); expect(results.list.length, 1); }); + + test('6.2 Race condition with replies', () async { + const signal = 'race_condition_signal'; + var callCount = 0; + await Wire.add( + Object(), + signal, + (_, __) async { + callCount++; + await Wire.send(signal); + }, + replies: 1, + ); + await Wire.send(signal); + expect(callCount, 1); + }); }); }