Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion cpp/include/cuvs/util/file_io.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <raft/core/error.hpp>
#include <raft/core/serialize.hpp>

#include <algorithm>
#include <cstring>
#include <istream>
#include <limits.h>
#include <memory>
#include <ostream>
#include <sstream>
#include <streambuf>
#include <string>
#include <utility>
Expand Down Expand Up @@ -166,6 +168,58 @@ class file_descriptor {
std::string path_;
};

/**
* @brief Create a numpy file with pre-allocated space and write the header.
*
* Opens a file, writes a numpy header for the given shape/dtype, and pre-allocates
* space for the data. This is useful for memory-mapped or streaming writes.
*
* @tparam T Data type for the numpy array
* @param path File path to create
* @param shape Shape of the numpy array (e.g., {rows, cols} for 2D)
* @return Pair of (file_descriptor, header_size)
*/
template <typename T>
std::pair<file_descriptor, size_t> create_numpy_file(const std::string& path,
const std::vector<size_t>& shape)
{
// Open file
file_descriptor fd(path, O_CREAT | O_RDWR | O_TRUNC, 0644);

// Build header
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<T>();
const bool fortran_order = false;
const raft::detail::numpy_serializer::header_t header = {dtype, fortran_order, shape};

std::stringstream ss;
raft::detail::numpy_serializer::write_header(ss, header);
std::string header_str = ss.str();
size_t header_size = header_str.size();

// Calculate data size from shape
size_t data_bytes = sizeof(T);
for (auto dim : shape) {
data_bytes *= dim;
}

// Pre-allocate file space
if (posix_fallocate(fd.get(), 0, header_size + data_bytes) != 0) {
RAFT_FAIL("Failed to pre-allocate space for file: %s", path.c_str());
}

// Seek to beginning and write header
if (lseek(fd.get(), 0, SEEK_SET) == -1) {
RAFT_FAIL("Failed to seek to beginning of file: %s", path.c_str());
}

ssize_t written = write(fd.get(), header_str.data(), header_str.size());
if (written < 0 || static_cast<size_t>(written) != header_str.size()) {
RAFT_FAIL("Failed to write numpy header to file: %s", path.c_str());
}

return {std::move(fd), header_size};
}

/**
* @brief Read large file in chunks using pread
*
Expand Down
132 changes: 9 additions & 123 deletions cpp/src/neighbors/detail/cagra/cagra_build.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -952,132 +952,18 @@ index<T, IdxT> build_ace(raft::resources const& res,
// Mark for cleanup if we fail after creating the directory
cleanup_on_failure = true;

// Helper lambda to write numpy header to file descriptor
auto write_numpy_header = [](int fd,
const std::vector<size_t>& shape,
const raft::detail::numpy_serializer::dtype_t& dtype) {
std::stringstream ss;
// Create numpy files with pre-allocated space
std::tie(reordered_fd, reordered_header_size) = cuvs::util::create_numpy_file<T>(
build_dir + "/reordered_dataset.npy", {dataset_size, dataset_dim});

const bool fortran_order = false;
const raft::detail::numpy_serializer::header_t header = {dtype, fortran_order, shape};
std::tie(augmented_fd, augmented_header_size) = cuvs::util::create_numpy_file<T>(
build_dir + "/augmented_dataset.npy", {dataset_size, dataset_dim});

raft::detail::numpy_serializer::write_header(ss, header);
std::tie(mapping_fd, mapping_header_size) =
cuvs::util::create_numpy_file<IdxT>(build_dir + "/dataset_mapping.npy", {dataset_size});

std::string header_str = ss.str();
ssize_t written = write(fd, header_str.data(), header_str.size());
if (written < 0 || static_cast<size_t>(written) != header_str.size()) {
RAFT_FAIL("Failed to write numpy header to file descriptor");
}
return header_str.size();
};

// Create and allocate dataset file
reordered_fd = cuvs::util::file_descriptor(
build_dir + "/reordered_dataset.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
{
std::stringstream ss;
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<T>();
const bool fortran_order = false;
const raft::detail::numpy_serializer::header_t header = {
dtype, fortran_order, {dataset_size, dataset_dim}};
raft::detail::numpy_serializer::write_header(ss, header);
reordered_header_size = ss.str().size();
}
if (posix_fallocate(reordered_fd.get(),
0,
reordered_header_size + dataset_size * dataset_dim * sizeof(T)) != 0) {
RAFT_FAIL("Failed to pre-allocate space for reordered dataset file");
}
{
auto dtype_for_dataset = raft::detail::numpy_serializer::get_numpy_dtype<T>();
RAFT_LOG_DEBUG("Writing reordered_dataset.npy header: shape=[%zu,%zu], dtype=%c",
dataset_size,
dataset_dim,
dtype_for_dataset.kind);
if (lseek(reordered_fd.get(), 0, SEEK_SET) == -1) {
RAFT_FAIL("Failed to seek to beginning of reordered dataset file");
}
write_numpy_header(reordered_fd.get(), {dataset_size, dataset_dim}, dtype_for_dataset);
}

// Create and allocate augmented dataset file
augmented_fd = cuvs::util::file_descriptor(
build_dir + "/augmented_dataset.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
{
std::stringstream ss;
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<T>();
const bool fortran_order = false;
const raft::detail::numpy_serializer::header_t header = {
dtype, fortran_order, {dataset_size, dataset_dim}};
raft::detail::numpy_serializer::write_header(ss, header);
augmented_header_size = ss.str().size();
}
if (posix_fallocate(augmented_fd.get(),
0,
augmented_header_size + dataset_size * dataset_dim * sizeof(T)) != 0) {
RAFT_FAIL("Failed to pre-allocate space for augmented dataset file");
}
// Seek to beginning before writing header
if (lseek(augmented_fd.get(), 0, SEEK_SET) == -1) {
RAFT_FAIL("Failed to seek to beginning of augmented dataset file");
}
write_numpy_header(augmented_fd.get(),
{dataset_size, dataset_dim},
raft::detail::numpy_serializer::get_numpy_dtype<T>());

// Create and allocate mapping file
mapping_fd = cuvs::util::file_descriptor(
build_dir + "/dataset_mapping.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
{
std::stringstream ss;
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
const bool fortran_order = false;
const raft::detail::numpy_serializer::header_t header = {
dtype, fortran_order, {dataset_size}};
raft::detail::numpy_serializer::write_header(ss, header);
mapping_header_size = ss.str().size();
}
if (posix_fallocate(mapping_fd.get(), 0, mapping_header_size + dataset_size * sizeof(IdxT)) !=
0) {
RAFT_FAIL("Failed to pre-allocate space for dataset mapping file");
}
{
auto dtype_for_mapping = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
RAFT_LOG_DEBUG("Writing dataset_mapping.npy header: shape=[%zu], dtype=%c",
dataset_size,
dtype_for_mapping.kind);
if (lseek(mapping_fd.get(), 0, SEEK_SET) == -1) {
RAFT_FAIL("Failed to seek to beginning of mapping file");
}
write_numpy_header(mapping_fd.get(), {dataset_size}, dtype_for_mapping);
}

// Create and allocate graph file
graph_fd = cuvs::util::file_descriptor(
build_dir + "/cagra_graph.npy", O_CREAT | O_RDWR | O_TRUNC, 0644);
{
std::stringstream ss;
const auto dtype = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
const bool fortran_order = false;
const raft::detail::numpy_serializer::header_t header = {
dtype, fortran_order, {dataset_size, graph_degree}};
raft::detail::numpy_serializer::write_header(ss, header);
graph_header_size = ss.str().size();
}
if (posix_fallocate(graph_fd.get(), 0, graph_header_size + cagra_graph_size) != 0) {
RAFT_FAIL("Failed to pre-allocate space for graph file");
}
{
auto dtype_for_graph = raft::detail::numpy_serializer::get_numpy_dtype<IdxT>();
RAFT_LOG_DEBUG("Writing cagra_graph.npy header: shape=[%zu,%zu], dtype=%c",
dataset_size,
graph_degree,
dtype_for_graph.kind);
if (lseek(graph_fd.get(), 0, SEEK_SET) == -1) {
RAFT_FAIL("Failed to seek to beginning of graph file");
}
write_numpy_header(graph_fd.get(), {dataset_size, graph_degree}, dtype_for_graph);
}
std::tie(graph_fd, graph_header_size) = cuvs::util::create_numpy_file<IdxT>(
build_dir + "/cagra_graph.npy", {dataset_size, graph_degree});

RAFT_LOG_DEBUG(
"ACE: Wrote numpy headers (reordered: %zu, augmented: %zu, mapping: %zu, graph: %zu bytes)",
Expand Down