Skip to content

Commit 647a79e

Browse files
committed
retain CPU affinity when rescheduling
1 parent 98719b0 commit 647a79e

File tree

5 files changed

+70
-40
lines changed

5 files changed

+70
-40
lines changed

src/process/clone.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ pub async fn sys_clone(
191191
.lock_save_irq()
192192
.insert(desc.tid, Arc::downgrade(&work));
193193

194-
sched::insert_task_cross_cpu(work);
194+
sched::insert_work_cross_cpu(work);
195195

196196
NUM_FORKS.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
197197

src/sched/mod.rs

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::arch::{Arch, ArchImpl};
12
use crate::drivers::timer::{Instant, now};
23
#[cfg(feature = "smp")]
34
use crate::interrupts::cpu_messenger::{Message, message_cpu};
@@ -6,7 +7,7 @@ use crate::process::owned::OwnedTask;
67
use crate::{per_cpu_private, per_cpu_shared, process::TASK_LIST};
78
use alloc::{boxed::Box, sync::Arc, vec::Vec};
89
use core::fmt::Debug;
9-
use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10+
use core::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
1011
use core::task::Waker;
1112
use core::time::Duration;
1213
use current::{CUR_TASK_PTR, current_task};
@@ -70,6 +71,10 @@ per_cpu_private! {
7071
static SCHED_STATE: SchedState = SchedState::new;
7172
}
7273

74+
per_cpu_shared! {
75+
pub static SHARED_SCHED_STATE: SharedSchedState = SharedSchedState::new;
76+
}
77+
7378
/// Default time-slice assigned to runnable tasks.
7479
const DEFAULT_TIME_SLICE: Duration = Duration::from_millis(4);
7580

@@ -129,18 +134,22 @@ pub fn spawn_kernel_work(fut: impl Future<Output = ()> + 'static + Send) {
129134
current_task().ctx.put_kernel_work(Box::pin(fut));
130135
}
131136

132-
/// Global atomic storing info about the least-tasked CPU.
133-
/// First 16 bits: CPU ID
134-
/// Remaining 48 bits: Run-queue weight
135-
#[cfg(feature = "smp")]
136-
static LEAST_TASKED_CPU_INFO: AtomicU64 = AtomicU64::new(0);
137-
const WEIGHT_SHIFT: u32 = 16;
138-
139137
#[cfg(feature = "smp")]
140138
fn get_best_cpu() -> CpuId {
141-
// Get the CPU with the least number of tasks.
142-
let least_tasked_cpu_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
143-
CpuId::from_value((least_tasked_cpu_info & 0xffff) as usize)
139+
let r = 0..ArchImpl::cpu_count();
140+
r.min_by(|&x, &y| {
141+
// TODO: Find a way to calculate already assigned affinities and account for that
142+
let info_x = SHARED_SCHED_STATE.get_by_cpu(x);
143+
let info_y = SHARED_SCHED_STATE.get_by_cpu(y);
144+
let weight_x = info_x.total_runq_weight.load(Ordering::Relaxed);
145+
let weight_y = info_y.total_runq_weight.load(Ordering::Relaxed);
146+
weight_x.cmp(&weight_y)
147+
})
148+
.map(CpuId::from_value)
149+
.unwrap_or_else(|| {
150+
warn!("No CPUs found when trying to get best CPU! Defaulting to CPU 0");
151+
CpuId::from_value(0)
152+
})
144153
}
145154

146155
/// Insert the given task onto a CPU's run queue.
@@ -149,17 +158,27 @@ pub fn insert_work(work: Arc<Work>) {
149158
}
150159

151160
#[cfg(feature = "smp")]
152-
pub fn insert_task_cross_cpu(task: Arc<Work>) {
153-
let cpu = get_best_cpu();
161+
pub fn insert_work_cross_cpu(work: Arc<Work>) {
162+
let last_cpu = work
163+
.sched_data
164+
.lock_save_irq()
165+
.as_ref()
166+
.map(|s| s.last_cpu)
167+
.unwrap_or(usize::MAX);
168+
let cpu = if last_cpu == usize::MAX {
169+
get_best_cpu()
170+
} else {
171+
CpuId::from_value(last_cpu)
172+
};
154173
if cpu == CpuId::this() {
155-
SCHED_STATE.borrow_mut().run_q.add_work(task);
174+
SCHED_STATE.borrow_mut().run_q.add_work(work);
156175
} else {
157-
message_cpu(cpu, Message::EnqueueWork(task)).expect("Failed to send task to CPU");
176+
message_cpu(cpu, Message::EnqueueWork(work)).expect("Failed to send task to CPU");
158177
}
159178
}
160179

161180
#[cfg(not(feature = "smp"))]
162-
pub fn insert_task_cross_cpu(task: Arc<Work>) {
181+
pub fn insert_work_cross_cpu(task: Arc<Work>) {
163182
insert_work(task);
164183
}
165184

@@ -198,27 +217,15 @@ impl SchedState {
198217
*LAST_UPDATE.borrow_mut() = now();
199218

200219
let weight = self.run_q.weight();
201-
let cpu_id = CpuId::this().value() as u64;
202-
let new_info = (cpu_id & 0xffff) | ((weight & 0xffffffffffff) << WEIGHT_SHIFT);
203-
let mut old_info = LEAST_TASKED_CPU_INFO.load(Ordering::Acquire);
204-
// Ensure we don't spin forever (possible with a larger number of CPUs)
205-
const MAX_RETRIES: usize = 8;
206-
// Ensure consistency
207-
for _ in 0..MAX_RETRIES {
208-
let old_cpu_id = old_info & 0xffff;
209-
let old_weight = old_info >> WEIGHT_SHIFT;
210-
if (cpu_id == old_cpu_id && old_info != new_info) || (weight < old_weight) {
211-
match LEAST_TASKED_CPU_INFO.compare_exchange(
212-
old_info,
213-
new_info,
214-
Ordering::AcqRel,
215-
Ordering::Acquire,
216-
) {
217-
Ok(_) => break,
218-
Err(x) => old_info = x,
219-
}
220-
}
221-
}
220+
let current_task = self.run_q.current().task.descriptor().tid();
221+
SHARED_SCHED_STATE
222+
.get()
223+
.current_task_id
224+
.store(current_task.value(), Ordering::Relaxed);
225+
SHARED_SCHED_STATE
226+
.get()
227+
.total_runq_weight
228+
.store(weight, Ordering::Relaxed);
222229
}
223230

224231
#[cfg(not(feature = "smp"))]
@@ -245,6 +252,20 @@ impl SchedState {
245252
}
246253
}
247254

255+
pub struct SharedSchedState {
256+
pub current_task_id: AtomicU32,
257+
pub total_runq_weight: AtomicU64,
258+
}
259+
260+
impl SharedSchedState {
261+
pub fn new() -> Self {
262+
Self {
263+
current_task_id: AtomicU32::new(0),
264+
total_runq_weight: AtomicU64::new(0),
265+
}
266+
}
267+
}
268+
248269
pub fn sched_init() {
249270
let init_task = OwnedTask::create_init_task();
250271

src/sched/runqueue/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
};
99
use alloc::{boxed::Box, collections::binary_heap::BinaryHeap, sync::Arc, vec::Vec};
1010
use core::{cmp, ptr, sync::atomic::Ordering};
11+
use libkernel::CpuOps;
1112
use vclock::VClock;
1213

1314
mod vclock;
@@ -116,6 +117,7 @@ impl RunQueue {
116117
// Task wants to deactivate. Drop the RunnableTask now to
117118
// restore sched_data.
118119
let work = cur_task.task.clone();
120+
cur_task.sched_data.last_cpu = ArchImpl::id();
119121
self.total_weight = self.total_weight.saturating_sub(cur_task.weight() as u64);
120122
drop(cur_task);
121123

src/sched/sched_task/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub struct Work {
2222
pub sched_data: SpinLock<Option<SchedulerData>>,
2323
}
2424

25+
pub const NR_CPUS: usize = 256;
26+
pub const CPU_MASK_SIZE: usize = NR_CPUS / 64;
27+
2528
#[derive(Clone)]
2629
pub struct SchedulerData {
2730
pub v_runtime: u128,
@@ -32,6 +35,8 @@ pub struct SchedulerData {
3235
pub exec_start: Option<Instant>,
3336
pub deadline: Option<Instant>,
3437
pub last_run: Option<Instant>,
38+
pub last_cpu: usize,
39+
pub cpu_mask: [u64; CPU_MASK_SIZE],
3540
}
3641

3742
impl SchedulerData {
@@ -43,6 +48,8 @@ impl SchedulerData {
4348
exec_start: None,
4449
deadline: None,
4550
last_run: None,
51+
last_cpu: usize::MAX,
52+
cpu_mask: [u64::MAX; CPU_MASK_SIZE],
4653
}
4754
}
4855
}

src/sched/waker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use alloc::sync::Arc;
22
use core::task::{RawWaker, RawWakerVTable, Waker};
33

44
use super::{
5-
SCHED_STATE,
5+
insert_work_cross_cpu,
66
sched_task::{Work, state::WakerAction},
77
};
88

@@ -25,7 +25,7 @@ unsafe fn wake_waker_no_consume(data: *const ()) {
2525

2626
match work.state.wake() {
2727
WakerAction::Enqueue => {
28-
SCHED_STATE.borrow_mut().run_q.add_work(work);
28+
insert_work_cross_cpu(work);
2929
}
3030
WakerAction::PreventedSleep | WakerAction::None => {}
3131
}

0 commit comments

Comments
 (0)