-
Notifications
You must be signed in to change notification settings - Fork 1.2k
use concurrent.futures.Executor instead of multiprocessing pool to resolve conflict with duet #7938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e98da30
9ebdd7e
23aaef5
2d92d28
3fc08ba
762dabc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import multiprocessing | ||
| import concurrent.futures as cf | ||
| import multiprocessing.pool | ||
| from collections.abc import Sequence | ||
| from typing import Any, TYPE_CHECKING | ||
|
|
@@ -46,7 +46,7 @@ def z_phase_calibration_workflow( | |
| cycle_depths: Sequence[int] = tuple(np.arange(3, 100, 20)), | ||
| random_state: cirq.RANDOM_STATE_OR_SEED_LIKE = None, | ||
| atol: float = 1e-3, | ||
| num_workers_or_pool: int | multiprocessing.pool.Pool = -1, | ||
| num_workers_or_pool: int | multiprocessing.pool.Pool | cf.Executor = -1, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAICT, the code later needs only the Pool.map or Executor.map functions. Would it be possible to change this to accept either an |
||
| pairs: Sequence[tuple[cirq.GridQubit, cirq.GridQubit]] | None = None, | ||
| tags: Sequence[Any] = (), | ||
| ) -> tuple[xeb_fitting.XEBCharacterizationResult, pd.DataFrame]: | ||
|
|
@@ -81,7 +81,7 @@ def z_phase_calibration_workflow( | |
| cycle_depths: The cycle depths to use. | ||
| random_state: The random state to use. | ||
| atol: Absolute tolerance to be used by the minimizer. | ||
| num_workers_or_pool: An optional multi-processing pool or number of workers. | ||
| num_workers_or_pool: An optional pool or number of workers. | ||
| A zero value means no multiprocessing. | ||
| A positive integer value will create a pool with the given number of workers. | ||
| A negative value will create pool with maximum number of workers. | ||
|
|
@@ -92,12 +92,12 @@ def z_phase_calibration_workflow( | |
| - A `pd.DataFrame` comparing the before and after fidelities. | ||
| """ | ||
|
|
||
| pool: multiprocessing.pool.Pool | None = None | ||
| pool: multiprocessing.pool.Pool | cf.Executor | None = None | ||
| local_pool = False | ||
| if isinstance(num_workers_or_pool, multiprocessing.pool.Pool): | ||
| if isinstance(num_workers_or_pool, (multiprocessing.pool.Pool, cf.Executor)): | ||
| pool = num_workers_or_pool # pragma: no cover | ||
| elif num_workers_or_pool != 0: | ||
| pool = multiprocessing.Pool(num_workers_or_pool if num_workers_or_pool > 0 else None) | ||
| pool = cf.ThreadPoolExecutor(num_workers_or_pool if num_workers_or_pool > 0 else None) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ThreadPoolExecutor is subject to GIL. Unless the mapped function spends a lot of time in numpy calls or waiting for IO, the execution would be the same as in a serial call or worse due to thread-switching overhead. I made a quick test with a many-term sums computed in series or in parallel with multiprocessing.Pool.map vs ThreadPoolExecutor.map. The ThreadPoolExecutor took about 2.5 times longer than a serial evaluation. example timing codedef partial_sum(start_end: tuple[int, int]) -> float:
total = 0
for i in range(*start_end, 3):
total += (-1) ** i * 1.0 / i
return total
def tedious_sum(terms_count: int, mapfunc) -> float:
total = sum(mapfunc(partial_sum, ((start, terms_count) for start in (1, 2, 3))))
return total
# %timeit tedious_sum(10_000_000, map)
# 2.93 s ± 51.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# pool = multiprocessing.Pool(3)
# %timeit tedious_sum(10_000_000, pool.map)
# 1.01 s ± 25.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# tpx = concurrent.futures.ThreadPoolExecutor(3)
# %timeit tedious_sum(10_000_000, tpx.map)
# 8.46 s ± 1.11 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Can you make a quick comparison of the If comparable I'd suggest to make it a default to do a serial evaluation. |
||
| local_pool = True | ||
|
|
||
| fids_df_0, circuits, sampled_df = parallel_xeb_workflow( | ||
|
|
@@ -143,8 +143,8 @@ def z_phase_calibration_workflow( | |
| ) | ||
|
|
||
| if local_pool: | ||
| assert isinstance(pool, multiprocessing.pool.Pool) | ||
| pool.close() | ||
| assert isinstance(pool, cf.Executor) | ||
| pool.shutdown() | ||
|
Comment on lines
+146
to
+147
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider wrapping the local |
||
| return result, before_after | ||
|
|
||
|
|
||
|
|
@@ -159,7 +159,7 @@ def calibrate_z_phases( | |
| cycle_depths: Sequence[int] = tuple(np.arange(3, 100, 20)), | ||
| random_state: cirq.RANDOM_STATE_OR_SEED_LIKE = None, | ||
| atol: float = 1e-3, | ||
| num_workers_or_pool: int | multiprocessing.pool.Pool = -1, | ||
| num_workers_or_pool: int | multiprocessing.pool.Pool | cf.Executor = -1, | ||
| pairs: Sequence[tuple[cirq.GridQubit, cirq.GridQubit]] | None = None, | ||
| tags: Sequence[Any] = (), | ||
| ) -> dict[tuple[cirq.Qid, cirq.Qid], cirq.PhasedFSimGate]: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - can we use the
futuresmodule name here for less indirection - like inCirq/cirq-core/cirq/experiments/benchmarking/parallel_xeb.py
Line 20 in 1ac366c