Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions xet_data/src/progress_tracking/progress_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ impl ItemProgress {
item_name: self.name.to_string(),
total_bytes,
bytes_completed,
transfer_bytes,
transfer_bytes_completed,
}
}
}
Expand Down Expand Up @@ -380,6 +382,8 @@ pub struct ItemProgressReport {
pub item_name: String,
pub total_bytes: u64,
pub bytes_completed: u64,
pub transfer_bytes: u64,
pub transfer_bytes_completed: u64,
}

#[cfg(test)]
Expand Down
114 changes: 108 additions & 6 deletions xet_pkg/src/legacy/progress_tracking/callback_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ impl GroupBridgeState {
for (&id, report) in &items {
let prev = self.prev_items.get(&id);
let prev_completed = prev.map_or(0, |p| p.bytes_completed);
let prev_transfer_completed = prev.map_or(0, |p| p.transfer_bytes_completed);
let increment = report.bytes_completed.saturating_sub(prev_completed);
let transfer_increment = report.transfer_bytes_completed.saturating_sub(prev_transfer_completed);

if increment > 0 || prev.is_none() {
if increment > 0 || transfer_increment > 0 || prev.is_none() {
item_updates.push(ItemProgressUpdate {
tracking_id: id,
item_name: Arc::from(report.item_name.as_str()),
Expand Down Expand Up @@ -123,11 +125,17 @@ impl ItemBridgeState {
fn compute_diff(&mut self, item_id: UniqueID, report: ItemProgressReport) -> ProgressUpdate {
let prev_completed = self.prev.as_ref().map_or(0, |p| p.bytes_completed);
let prev_total = self.prev.as_ref().map_or(0, |p| p.total_bytes);
let prev_transfer_bytes = self.prev.as_ref().map_or(0, |p| p.transfer_bytes);
let prev_transfer_completed = self.prev.as_ref().map_or(0, |p| p.transfer_bytes_completed);

let bytes_increment = report.bytes_completed.saturating_sub(prev_completed);
let total_increment = report.total_bytes.saturating_sub(prev_total);
let transfer_bytes_increment = report.transfer_bytes.saturating_sub(prev_transfer_bytes);
let transfer_completion_increment = report.transfer_bytes_completed.saturating_sub(prev_transfer_completed);

let item_updates = if bytes_increment > 0 || self.prev.is_none() {
let has_progress = bytes_increment > 0 || transfer_completion_increment > 0 || self.prev.is_none();

let item_updates = if has_progress {
vec![ItemProgressUpdate {
tracking_id: item_id,
item_name: Arc::from(report.item_name.as_str()),
Expand All @@ -146,10 +154,10 @@ impl ItemBridgeState {
total_bytes_completed: report.bytes_completed,
total_bytes_completion_increment: bytes_increment,
total_bytes_completion_rate: None,
total_transfer_bytes: 0,
total_transfer_bytes_increment: 0,
total_transfer_bytes_completed: 0,
total_transfer_bytes_completion_increment: 0,
total_transfer_bytes: report.transfer_bytes,
total_transfer_bytes_increment: transfer_bytes_increment,
total_transfer_bytes_completed: report.transfer_bytes_completed,
total_transfer_bytes_completion_increment: transfer_completion_increment,
total_transfer_bytes_completion_rate: None,
};

Expand Down Expand Up @@ -347,6 +355,24 @@ mod tests {
item_name: name.to_string(),
total_bytes,
bytes_completed,
transfer_bytes: 0,
transfer_bytes_completed: 0,
}
}

fn make_item_report_with_transfer(
name: &str,
total_bytes: u64,
bytes_completed: u64,
transfer_bytes: u64,
transfer_bytes_completed: u64,
) -> ItemProgressReport {
ItemProgressReport {
item_name: name.to_string(),
total_bytes,
bytes_completed,
transfer_bytes,
transfer_bytes_completed,
}
}

Expand Down Expand Up @@ -432,6 +458,27 @@ mod tests {
assert_eq!(update.item_updates[0].bytes_completion_increment, 0);
}

#[test]
fn test_group_bridge_transfer_progress_includes_item() {
let mut state = GroupBridgeState::new();
let id = UniqueID::new();

// Initial state
let group1 = make_group_report(1000, 0, 800, 0);
let items1 = HashMap::from([(id, make_item_report_with_transfer("a.bin", 1000, 0, 800, 0))]);
state.compute_diff(group1, items1);

// Transfer progress only (bytes_completed unchanged)
let group2 = make_group_report(1000, 0, 800, 200);
let items2 = HashMap::from([(id, make_item_report_with_transfer("a.bin", 1000, 0, 800, 200))]);
let update = state.compute_diff(group2, items2);

assert_eq!(update.total_transfer_bytes_completion_increment, 200);
assert_eq!(update.total_bytes_completion_increment, 0);
// Item should be included despite bytes_completed not changing
assert_eq!(update.item_updates.len(), 1);
}

#[test]
fn test_item_bridge_first_diff() {
let mut state = ItemBridgeState::new();
Expand Down Expand Up @@ -486,4 +533,59 @@ mod tests {
assert_eq!(update.total_bytes_completion_increment, 0);
assert!(update.item_updates.is_empty());
}

#[test]
fn test_item_bridge_transfer_bytes_reported() {
let mut state = ItemBridgeState::new();
let id = UniqueID::new();

let report = make_item_report_with_transfer("file.bin", 1000, 0, 800, 200);
let update = state.compute_diff(id, report);

assert_eq!(update.total_transfer_bytes, 800);
assert_eq!(update.total_transfer_bytes_completed, 200);
assert_eq!(update.total_transfer_bytes_completion_increment, 200);
}

#[test]
fn test_item_bridge_transfer_bytes_incremental() {
let mut state = ItemBridgeState::new();
let id = UniqueID::new();

state.compute_diff(id, make_item_report_with_transfer("file.bin", 1000, 0, 800, 200));

let update = state.compute_diff(id, make_item_report_with_transfer("file.bin", 1000, 0, 800, 500));

assert_eq!(update.total_transfer_bytes_increment, 0);
assert_eq!(update.total_transfer_bytes_completion_increment, 300);
}

#[test]
fn test_item_bridge_transfer_progress_triggers_callback() {
let mut state = ItemBridgeState::new();
let id = UniqueID::new();

// Initial state: no bytes completed, no transfer
state.compute_diff(id, make_item_report_with_transfer("file.bin", 1000, 0, 800, 0));

// Transfer progress without bytes_completed change should still produce an update
let update = state.compute_diff(id, make_item_report_with_transfer("file.bin", 1000, 0, 800, 100));

assert!(!update.is_empty());
assert_eq!(update.total_transfer_bytes_completion_increment, 100);
assert_eq!(update.total_bytes_completion_increment, 0);
}

#[test]
fn test_item_bridge_no_transfer_no_bytes_is_empty() {
let mut state = ItemBridgeState::new();
let id = UniqueID::new();

state.compute_diff(id, make_item_report_with_transfer("file.bin", 1000, 100, 800, 200));

// Same values again: should be empty
let update = state.compute_diff(id, make_item_report_with_transfer("file.bin", 1000, 100, 800, 200));

assert!(update.is_empty());
}
}