Conversation
There was a problem hiding this comment.
Pull Request Overview
Adds a PipeWire audio backend to provide native support for the modern PipeWire audio server on Linux systems, offering low-latency audio playback as an alternative to existing backends.
- Implements a complete PipeWire audio sink with proper stream management and error handling
- Adds feature flags and dependency configuration for the new backend
- Integrates the backend into the existing audio backend selection system
Reviewed Changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| playback/src/audio_backend/pipewire.rs | Complete PipeWire sink implementation with audio format conversion and stream handling |
| playback/src/audio_backend/mod.rs | Registers the PipeWire backend in the available backends list |
| playback/Cargo.toml | Adds pipewire and libspa dependencies for the playback crate |
| Cargo.toml | Defines the pipewire-backend feature and workspace-level dependencies |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| for &byte in data { | ||
| sender.send(byte).map_err(|_| PipeWireError::NotConnected)?; | ||
| } |
There was a problem hiding this comment.
Sending audio data byte-by-byte through a channel is highly inefficient and will likely cause audio dropouts. Consider sending data in chunks or using a different approach like a ring buffer for better performance.
| F32 => spa::param::audio::AudioFormat::F32LE, | ||
| S32 => spa::param::audio::AudioFormat::S32LE, | ||
| S24 => spa::param::audio::AudioFormat::S24LE, | ||
| S24_3 => spa::param::audio::AudioFormat::S24_32LE, |
There was a problem hiding this comment.
The audio format mapping for S24_3 appears incorrect. S24_3 typically represents 24-bit samples packed in 3 bytes, but S24_32LE represents 24-bit samples in 32-bit containers. This should likely be S24_3LE if available.
| S24_3 => spa::param::audio::AudioFormat::S24_32LE, | |
| S24_3 => spa::param::audio::AudioFormat::S24_3LE, |
| for i in 0..total_bytes { | ||
| match receiver.try_recv() { | ||
| Ok(byte) => { | ||
| slice[i] = byte; | ||
| bytes_read += 1; | ||
| } | ||
| Err(_) => break, | ||
| } | ||
| } |
There was a problem hiding this comment.
Reading audio data byte-by-byte in the audio callback will cause significant performance issues and audio dropouts. The audio callback should be lock-free and operate on larger chunks of data.
| self.initialized = true; | ||
|
|
||
| // Give the thread a moment to initialize | ||
| thread::sleep(std::time::Duration::from_millis(100)); |
There was a problem hiding this comment.
Using a fixed sleep duration for initialization is unreliable. Consider using proper synchronization mechanisms like condition variables or status callbacks to ensure the PipeWire stream is actually ready before returning.
| // Wait for the thread to finish with a timeout | ||
| if let Some(handle) = self._main_loop_handle.take() { | ||
| // Give it a moment to exit gracefully | ||
| thread::sleep(std::time::Duration::from_millis(100)); |
There was a problem hiding this comment.
Using a fixed sleep duration before joining the thread is unreliable. The thread should signal when it's ready to exit, or use a timeout with the join operation.
roderickvd
left a comment
There was a problem hiding this comment.
Yes we will consider 😆 very nice addition. It needs work; also take a look at some of the Copilot suggestions which seem to make sense to me.
Don't forget to add a changelog entry too.
Cargo.toml
Outdated
| ] } | ||
| url = "2.2" | ||
|
|
||
| # Dependencies for pipewire |
There was a problem hiding this comment.
These should go to playback/Cargo.toml.
|
Hi Roderick, |
|
First a bit of a low-effort but hopefully high-impact reply: does looking at RustAudio/cpal#938 help? That implementation is not perfect, but the use of In your fork I see that you're pushing a Hope this helps. |
roderickvd
left a comment
There was a problem hiding this comment.
Sorry for the late feedback. Here is a further review.
Also if you would add a changelog entry?
| } | ||
|
|
||
| impl Open for PipeWireSink { | ||
| fn open(_device: Option<String>, format: AudioFormat) -> Self { |
There was a problem hiding this comment.
I think we could not ignore device and around line 238 where you create the stream add:
*pw::keys::TARGET_OBJECT => device.as_deref().unwrap_or("")Probably it would then also need:
- logic to use the "default" device (skipping
TARGET_OBJECT?) - support for object enumeration
Point 2 may be complex so for this PR we could also device to leave enumeration for a next PR and just accept a device parameter now.
| if let Some(handle) = self._main_loop_handle.take() { | ||
| // Give it a moment to exit gracefully | ||
| thread::sleep(std::time::Duration::from_millis(100)); | ||
| let _ = handle.join(); |
There was a problem hiding this comment.
May want to propagate any errors.
| // Wait for the thread to finish with a timeout | ||
| if let Some(handle) = self._main_loop_handle.take() { | ||
| // Give it a moment to exit gracefully | ||
| thread::sleep(std::time::Duration::from_millis(100)); |
| info!("Stopping PipeWire sink..."); | ||
|
|
||
| // Signal the thread to quit | ||
| self.quit_flag.store(true, Ordering::Relaxed); |
There was a problem hiding this comment.
For safety we may want to consider stronger ordering here, like Ordering::Release, because below there's some operations and then you're resetting the quit_flag again - if it matters that this truly happens after the operations...
| .add_local_listener_with_user_data((consumer, quit_flag.clone())) | ||
| .process(move |stream, (consumer, quit_flag)| { | ||
| // Check if we should quit | ||
| if quit_flag.load(Ordering::Relaxed) { |
| let bytes_read = consumer.pop_slice(slice); | ||
|
|
||
| // Fill any remaining space with silence if underrun | ||
| if bytes_read < total_bytes { |
There was a problem hiding this comment.
Consider adding logging in such cases:
if bytes_read < total_bytes {
warn!("Buffer underrun: requested {} bytes, got {}", total_bytes, bytes_read);
slice[bytes_read..total_bytes].fill(0);
}Makes it easier to debug.
|
|
||
| // Fill any remaining space with silence if underrun | ||
| if bytes_read < total_bytes { | ||
| slice[bytes_read..total_bytes].fill(0); |
There was a problem hiding this comment.
For F32 and F64 formats, I'm not sure if u8 byte values of 0 are equal to 0.0 in float.
| self.initialized = true; | ||
|
|
||
| // Give the thread a moment to initialize | ||
| thread::sleep(std::time::Duration::from_millis(100)); |
There was a problem hiding this comment.
Better to use a channel according to this pattern:
// In the struct:
pub struct PipeWireSink {
// ... existing fields ...
init_rx: Option<oneshot::Receiver<Result<(), String>>>,
}
// In start():
let (init_tx, init_rx) = oneshot::channel();
self.init_rx = Some(init_rx);
let handle = thread::spawn(move || {
match initialize_pipewire_stream(...) {
Ok(stream) => {
let _ = init_tx.send(Ok(()));
run_main_loop(stream, ...);
}
Err(e) => {
let _ = init_tx.send(Err(e.to_string()));
}
}
});
// Wait for initialization with timeout:
match self.init_rx.take().unwrap().recv_timeout(Duration::from_secs(1)) {
Ok(Ok(())) => { /* success */ },
Ok(Err(e)) => return Err(...),
Err(_) => return Err(timeout_error),
}| producer: Option<RingProducer>, | ||
| // Flag to signal thread to stop | ||
| quit_flag: Arc<AtomicBool>, | ||
| initialized: bool, |
There was a problem hiding this comment.
This could be removed in favor of checking _main_loop_handle.is_some().
For your consideration : My implementation of a native pipewire audio-backend.