@@ -58,7 +58,6 @@ class ResourceJobFragment(JobFragment):
5858 max_rss_bytes : int | None = None
5959 avg_rss_bytes : float | None = None
6060 avg_cpu_percent : float | None = None
61- avg_cpu_per_core : list [float ] | None = None
6261
6362 num_resource_samples : int = 0
6463
@@ -69,50 +68,36 @@ class JobResourceAggregate:
6968 Tracks aggregate information over a number of samples whilst minimizing memory usage.
7069 """
7170
72- def __init__ (self , job : JobSpec , num_cores : int | None ) -> None :
71+ def __init__ (self , job : JobSpec ) -> None :
7372 """Construct an aggregate for storing sampling info for a given job specification.
7473
7574 Arguments:
7675 job: The specification of the job which is having its information aggregated.
77- num_cores: The number of logical CPU cores available on the host system.
7876
7977 """
8078 self .job_spec = job
81- self .num_cores = num_cores
8279 self .sample_count = 0
8380 self .sum_rss = 0.0
8481 self .max_rss = 0
8582 self .sum_cpu = 0.0
86- if num_cores :
87- self .sum_cpu_per_core = [0.0 ] * num_cores
8883
89- def add_sample (self , rss : int , cpu : float , cpu_per_core : list [ float ] ) -> None :
84+ def add_sample (self , rss : int , cpu : float ) -> None :
9085 """Aggregate an additional resource sample taken during this job's active window."""
9186 self .sample_count += 1
9287 self .sum_rss += rss
9388 self .max_rss = max (self .max_rss , rss )
9489 self .sum_cpu += cpu
95- if self .num_cores :
96- self .sum_cpu_per_core = [
97- total + n for total , n in zip (self .sum_cpu_per_core , cpu_per_core , strict = True )
98- ]
9990
10091 def finalize (self ) -> ResourceJobFragment :
10192 """Finalize the aggregated information for a job, generating a report fragment."""
10293 if self .sample_count == 0 :
10394 return ResourceJobFragment (self .job_spec )
10495
105- if self .num_cores :
106- avg_cpu_per_core = [x / self .sample_count for x in self .sum_cpu_per_core ]
107- else :
108- avg_cpu_per_core = None
109-
11096 return ResourceJobFragment (
11197 self .job_spec ,
11298 max_rss_bytes = self .max_rss ,
11399 avg_rss_bytes = self .sum_rss / self .sample_count ,
114100 avg_cpu_percent = self .sum_cpu / self .sample_count ,
115- avg_cpu_per_core = avg_cpu_per_core ,
116101 num_resource_samples = self .sample_count ,
117102 )
118103
@@ -218,7 +203,7 @@ def _sampling_loop(self) -> None:
218203 # Update all running job aggregates with system sample
219204 with self ._lock :
220205 for aggregate in self ._running_jobs .values ():
221- aggregate .add_sample (sys_rss , sys_cpu , sys_cpu_per_core )
206+ aggregate .add_sample (sys_rss , sys_cpu )
222207
223208 sleep_time = max (next_run_at - time .time (), 0 )
224209 time .sleep (sleep_time )
@@ -239,7 +224,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None:
239224 running = job_id in self ._running_jobs
240225 started = running or job_id in self ._finished_jobs
241226 if not started and status != JobStatus .QUEUED :
242- self ._running_jobs [job_id ] = JobResourceAggregate (job , self . _num_cores )
227+ self ._running_jobs [job_id ] = JobResourceAggregate (job )
243228 running = True
244229 if running and status .ended :
245230 aggregates = self ._running_jobs .pop (job_id )
0 commit comments