Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
26fd4af
ilmd + sqlitereader
RasmusOrsoe Nov 4, 2025
cf39cc0
automatically infer memmap size
RasmusOrsoe Nov 4, 2025
fbdd12b
add method for identifying serialization method in used lmdbwriter
RasmusOrsoe Nov 4, 2025
b3fd015
fix serialization of lambda func
RasmusOrsoe Nov 4, 2025
3c2e3c1
add query_database
RasmusOrsoe Nov 4, 2025
c397d2f
add `get_all_indices` utility method
RasmusOrsoe Nov 4, 2025
bd04276
allow lists of data representations to lmdbwriter
RasmusOrsoe Nov 4, 2025
4ffa2c6
add lmdb dataset
RasmusOrsoe Nov 6, 2025
6255999
adjust `GraphNeTDataModule` to accept lmdb
RasmusOrsoe Nov 6, 2025
70c452b
Error handling in lmdbwriter
RasmusOrsoe Nov 11, 2025
8698fdc
add missing truth variables to data representation in lmdb
RasmusOrsoe Nov 12, 2025
948e4d3
add meta_data to lmdb
RasmusOrsoe Nov 17, 2025
4fc40f5
add ´SQLiteToLMDBConverter´
RasmusOrsoe Dec 9, 2025
fe19562
add unit tests for lmdb
RasmusOrsoe Dec 9, 2025
9212fdb
update docs and add lmdb to setup.py
RasmusOrsoe Dec 9, 2025
97807dc
update conversion example with lmdb backend
RasmusOrsoe Dec 9, 2025
54d4be0
Merge branch 'main' into lmdb_pr
RasmusOrsoe Dec 9, 2025
c12049b
# noqa: C901
RasmusOrsoe Dec 9, 2025
602c3cf
Merge branch 'lmdb_pr' of https://github.com/RasmusOrsoe/graphnet int…
RasmusOrsoe Dec 9, 2025
28ba1cf
mypy update
RasmusOrsoe Dec 9, 2025
01af352
mypy
RasmusOrsoe Dec 9, 2025
3e4a88d
update missing column logic
RasmusOrsoe Dec 9, 2025
b055391
remove stray function call in `test_dataconverters_and_datasets.py`
RasmusOrsoe Dec 9, 2025
ea6ee69
expand unit tests
RasmusOrsoe Dec 9, 2025
72d248a
Update deprecated converters in unit test
RasmusOrsoe Dec 9, 2025
206f7a4
fix unit test
RasmusOrsoe Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/data_conversion/data_conversion.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ In this example, the writer will save the entire set of extractor outputs - a di



Two writers are implemented in GraphNeT; the :code:`SQLiteWriter` and :code:`ParquetWriter`, each of which output files that are directly used for
training by :code:`ParquetDataset` and :code:`SQLiteDataset`.
Three writers are implemented in GraphNeT; the :code:`SQLiteWriter`, :code:`ParquetWriter`, and :code:`LMDBWriter`, each of which output files that are directly used for
training by :code:`SQLiteDataset`, :code:`ParquetDataset`, and :code:`LMDBDataset` respectively.



Expand Down
54 changes: 45 additions & 9 deletions docs/source/datasets/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,19 @@ It looks like so:
</details>


:code:`SQLiteDataset` & :code:`ParquetDataset`
----------------------------------------------
:code:`SQLiteDataset`, :code:`ParquetDataset` & :code:`LMDBDataset`
--------------------------------------------------------------------

The two specific implementations of :code:`Dataset` exists :
The three specific implementations of :code:`Dataset` exists :

- `ParquetDataset <https://graphnet-team.github.io/graphnet/api/graphnet.data.parquet.parquet_dataset.html>`_ : Constructs :code:`Dataset` from files created by :code:`ParquetWriter`.
- `SQLiteDataset <https://graphnet-team.github.io/graphnet/api/graphnet.data.sqlite.sqlite_dataset.html>`_ : Constructs :code:`Dataset` from files created by :code:`SQLiteWriter`.
- `LMDBDataset <https://graphnet-team.github.io/graphnet/api/graphnet.data.dataset.lmdb.lmdb_dataset.html>`_ : Constructs :code:`Dataset` from files created by :code:`LMDBWriter`.


To instantiate a :code:`Dataset` from your files, you must specify at least the following:

- :code:`pulsemaps`: These are named fields in your Parquet files, or tables in your SQLite databases, which store one or more pulse series from which you would like to create a dataset. A pulse series represents the detector response, in the form of a series of PMT hits or pulses, in some time window, usually triggered by a single neutrino or atmospheric muon interaction. This is the data that will be served as input to the `Model`.
- :code:`pulsemaps`: These are named fields in your Parquet files, or tables in your SQLite or LMDB databases, which store one or more pulse series from which you would like to create a dataset. A pulse series represents the detector response, in the form of a series of PMT hits or pulses, in some time window, usually triggered by a single neutrino or atmospheric muon interaction. This is the data that will be served as input to the `Model`.
- :code:`truth_table`: The name of a table/array that contains the truth-level information associated with the pulse series, and should contain the truth labels that you would like to reconstruct or classify. Often this table will contain the true physical attributes of the primary particle — such as its true direction, energy, PID, etc. — and is therefore graph- or event-level (as opposed to the pulse series tables, which are node- or hit-level) truth information.
- :code:`features`: The names of the columns in your pulse series table(s) that you would like to include for training; they typically constitute the per-node/-hit features such as xyz-position of sensors, charge, and photon arrival times.
- :code:`truth`: The columns in your truth table/array that you would like to include in the dataset.
Expand Down Expand Up @@ -225,6 +226,32 @@ Or similarly for Parquet files:

graph = dataset[0] # torch_geometric.data.Data

Or similarly for LMDB files:

.. code-block:: python

from graphnet.data.dataset.lmdb.lmdb_dataset import LMDBDataset
from graphnet.models.detector.prometheus import Prometheus
from graphnet.models.graphs import KNNGraph
from graphnet.models.graphs.nodes import NodesAsPulses

graph_definition = KNNGraph(
detector=Prometheus(),
node_definition=NodesAsPulses(),
nb_nearest_neighbours=8,
)

dataset = LMDBDataset(
path="data/examples/lmdb/prometheus/prometheus-events.lmdb",
pulsemaps="total",
truth_table="mc_truth",
features=["sensor_pos_x", "sensor_pos_y", "sensor_pos_z", "t", ...],
truth=["injection_energy", "injection_zenith", ...],
graph_definiton = graph_definition,
)

graph = dataset[0] # torch_geometric.data.Data

It's then straightforward to create a :code:`DataLoader` for training, which will take care of batching, shuffling, and such:

.. code-block:: python
Expand All @@ -250,10 +277,10 @@ By default, the following fields will be available in a graph built by :code:`Da
- :code:`graph[truth_label] for truth_label in truth`: For each truth label in the :code:`truth` argument, the corresponding data is stored as a :code:`[num_rows, 1]` dimensional tensor. E.g., :code:`graph["energy"] = torch.tensor(26, dtype=torch.float)`
- :code:`graph[feature] for feature in features`: For each feature given in the :code:`features` argument, the corresponding data is stored as a :code:`[num_rows, 1]` dimensional tensor. E.g., :code:`graph["sensor_x"] = torch.tensor([100, -200, -300, 200], dtype=torch.float)``

:code:`SQLiteDataset` vs. :code:`ParquetDataset`
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:code:`SQLiteDataset` vs. :code:`ParquetDataset` vs. :code:`LMDBDataset`
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Besides working on different file formats, :code:`SQLiteDataset` and :code:`ParquetDataset` have significant differences,
Besides working on different file formats, :code:`SQLiteDataset`, :code:`ParquetDataset`, and :code:`LMDBDataset` have significant differences,
which may lead you to choose one over the other, depending on the problem at hand.

:SQLiteDataset: SQLite provides fast random access to all events inside it. This makes plotting and subsampling your dataset particularly easy,
Expand All @@ -265,13 +292,20 @@ which may lead you to choose one over the other, depending on the problem at han
This means that the subsampling of your dataset needs to happen prior to the conversion to :code:`parquet`, unlike `SQLiteDataset` which allows for subsampling after conversion, due to it's fast random access.
Conversion of files to :code:`parquet` is significantly faster than its :code:`SQLite` counterpart.

:LMDBDataset: LMDB databases produced by :code:`LMDBWriter` store events as key-value pairs with configurable serialization methods (pickle, json, msgpack, dill).
:code:`LMDBDataset` supports two modes: reading raw tables and computing data representations in real-time (similar to :code:`SQLiteDataset`), or reading pre-computed data representations directly from the database for faster access.
LMDB provides fast random access similar to SQLite, while also supporting efficient storage of pre-computed graph representations, making it suitable for scenarios where you want to pre-compute and cache data representations.
LMDB takes up roughly half the space of SQLite, and is therefore a good compromise between SQLite and Parquet.


.. note::

:code:`ParquetDataset` is scalable to ultra large datasets, but is more difficult to work with and has a higher memory consumption.

:code:`SQLiteDataset` does not scale to very large datasets, but is easy to work with and has minimal memory consumption.

:code:`LMDBDataset` provides a balance between SQLite and Parquet, offering fast random access and support for pre-computed representations, making it well-suited for scenarios where data representations are computed once and reused multiple times.


Choosing a subset of events using `selection`
----------------------------------------------
Expand All @@ -297,7 +331,7 @@ would produce a :code:`Dataset` with only those five events.

.. note::

For :code:`SQLiteDatase`, the :code:`selection` argument specifies individual events chosen for the dataset,
For :code:`SQLiteDataset` and :code:`LMDBDataset`, the :code:`selection` argument specifies individual events chosen for the dataset,
whereas for :code:`ParquetDataset`, the :code:`selection` argument specifies which batches are used in the dataset.


Expand Down Expand Up @@ -347,12 +381,14 @@ You can combine multiple instances of :code:`Dataset` from GraphNeT into a singl
from graphnet.data import EnsembleDataset
from graphnet.data.parquet import ParquetDataset
from graphnet.data.sqlite import SQLiteDataset
from graphnet.data.dataset.lmdb.lmdb_dataset import LMDBDataset

dataset_1 = SQLiteDataset(...)
dataset_2 = SQLiteDataset(...)
dataset_3 = ParquetDataset(...)
dataset_4 = LMDBDataset(...)

ensemble_dataset = EnsembleDataset([dataset_1, dataset_2, dataset_3])
ensemble_dataset = EnsembleDataset([dataset_1, dataset_2, dataset_3, dataset_4])

You can find a detailed example `here <https://github.com/graphnet-team/graphnet/blob/main/examples/02_data/04_ensemble_dataset.py>`_ .

Expand Down
16 changes: 12 additions & 4 deletions examples/01_icetray/01_convert_i3_files.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Example of converting I3-files to SQLite and Parquet."""
"""Example of converting I3-files to SQLite, Parquet, and LMDB."""

from glob import glob

Expand All @@ -12,6 +12,7 @@
from graphnet.data.dataconverter import DataConverter
from graphnet.data.parquet import ParquetDataConverter
from graphnet.data.sqlite import SQLiteDataConverter
from graphnet.data.pre_configured.dataconverters import I3ToLMDBConverter
from graphnet.utilities.argparse import ArgumentParser
from graphnet.utilities.imports import has_icecube_package
from graphnet.utilities.logging import Logger
Expand All @@ -29,6 +30,7 @@
)

CONVERTER_CLASS = {
"lmdb": I3ToLMDBConverter,
"sqlite": SQLiteDataConverter,
"parquet": ParquetDataConverter,
}
Expand All @@ -55,7 +57,7 @@ def main_icecube86(backend: str) -> None:
workers=1,
)
converter(inputs)
if backend == "sqlite":
if backend in ["sqlite", "lmdb"]:
converter.merge_files()


Expand Down Expand Up @@ -83,7 +85,7 @@ def main_icecube_upgrade(backend: str) -> None:
gcd_rescue=gcd_rescue,
)
converter(inputs)
if backend == "sqlite":
if backend in ["sqlite", "lmdb"]:
converter.merge_files()


Expand All @@ -99,7 +101,13 @@ def main_icecube_upgrade(backend: str) -> None:
"""
)

parser.add_argument("backend", choices=["sqlite", "parquet"])
parser.add_argument(
"backend",
nargs="?",
choices=["lmdb", "sqlite", "parquet"],
default="lmdb",
help="Backend format to convert to (default: %(default)s)",
)
parser.add_argument(
"detector", choices=["icecube-86", "icecube-upgrade"]
)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"polars >=0.19",
"torchscale==0.2.0",
"h5py>= 3.7.0",
"lmdb>=1.4.1",
]

EXTRAS_REQUIRE = {
Expand Down
6 changes: 6 additions & 0 deletions src/graphnet/data/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class I3FileSet: # noqa: D101
gcd_file: str


@dataclass
class SQLiteFileSet: # noqa: D101
db_path: str
event_nos: List[int]


@dataclass
class Settings:
"""Dataclass for workers in I3Deployer."""
Expand Down
36 changes: 28 additions & 8 deletions src/graphnet/data/dataconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
from .extractors import Extractor
from .extractors.icecube import I3Extractor
from .extractors.liquido import H5Extractor
from .extractors.internal import ParquetExtractor
from .extractors.internal import ParquetExtractor, SQLiteExtractor
from .extractors.prometheus import PrometheusExtractor
from .extractors.km3net import KM3NeTExtractor

from .dataclasses import I3FileSet
from .dataclasses import I3FileSet, SQLiteFileSet


def init_global_index(index: Synchronized, output_files: List[str]) -> None:
Expand Down Expand Up @@ -52,6 +52,7 @@ def __init__(
List[ParquetExtractor],
List[H5Extractor],
List[PrometheusExtractor],
List[SQLiteExtractor],
List[KM3NeTExtractor],
],
index_column: str = "event_no",
Expand Down Expand Up @@ -115,7 +116,7 @@ def __call__(self, input_dir: Union[str, List[str]]) -> None:
@final
def _launch_jobs(
self,
input_files: Union[List[str], List[I3FileSet]],
input_files: Union[List[str], List[I3FileSet], List[SQLiteFileSet]],
) -> None:
"""Multi Processing Logic.

Expand All @@ -137,7 +138,9 @@ def _launch_jobs(
self._update_shared_variables(pool)

@final
def _process_file(self, file_path: Union[str, I3FileSet]) -> None:
def _process_file(
self, file_path: Union[str, I3FileSet, SQLiteFileSet]
) -> None:
"""Process a single file.

Calls file reader to recieve extracted output, event ids is
Expand Down Expand Up @@ -183,15 +186,32 @@ def _process_file(self, file_path: Union[str, I3FileSet]) -> None:
)

@final
def _create_file_name(self, input_file_path: Union[str, I3FileSet]) -> str:
def _create_file_name(
self, input_file_path: Union[str, I3FileSet, SQLiteFileSet]
) -> str:
"""Convert input file path to an output file name."""
# Handle different input types
event_subset_suffix = ""
if isinstance(input_file_path, I3FileSet):
input_file_path = input_file_path.i3_file
file_name = os.path.basename(input_file_path)
actual_path = input_file_path.i3_file
elif isinstance(input_file_path, SQLiteFileSet):
actual_path = input_file_path.db_path
# Add subset identifier based on first and last event numbers
event_nos = input_file_path.event_nos
if len(event_nos) > 0:
event_subset_suffix = f"_{event_nos[0]}_{event_nos[-1]}"
else:
actual_path = input_file_path

# Now actual_path is always a string
file_name = os.path.basename(actual_path)
file_name_without_extension = file_name
for ext in self._file_reader._accepted_file_extensions:
if file_name.endswith(ext):
file_name_without_extension = file_name.replace(ext, "")
return file_name_without_extension.replace(".i3", "")
return (file_name_without_extension + event_subset_suffix).replace(
".i3", ""
)

@final
def _assign_event_no(
Expand Down
31 changes: 26 additions & 5 deletions src/graphnet/data/datamodule.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
EnsembleDataset,
SQLiteDataset,
ParquetDataset,
LMDBDataset,
)
from graphnet.utilities.logging import Logger
from graphnet.data.dataloader import DataLoader
Expand All @@ -22,7 +23,10 @@ class GraphNeTDataModule(pl.LightningDataModule, Logger):
def __init__(
self,
dataset_reference: Union[
Type[SQLiteDataset], Type[ParquetDataset], Type[Dataset]
Type[SQLiteDataset],
Type[ParquetDataset],
Type[LMDBDataset],
Type[Dataset],
],
dataset_args: Dict[str, Any],
selection: Optional[Union[List[int], List[List[int]]]] = None,
Expand Down Expand Up @@ -250,6 +254,22 @@ def teardown(self) -> None: # type: ignore[override]
):
self._test_dataset._close_connection()

# Close LMDB connections
if hasattr(self, "_train_dataset") and isinstance(
self._train_dataset, LMDBDataset
):
self._train_dataset._close_connection()

if hasattr(self, "_val_dataset") and isinstance(
self._val_dataset, LMDBDataset
):
self._val_dataset._close_connection()

if hasattr(self, "_test_dataset") and isinstance(
self._test_dataset, LMDBDataset
):
self._test_dataset._close_connection()

return

def _create_dataloader(
Expand Down Expand Up @@ -317,14 +337,15 @@ def _validate_dataset_class(self) -> None:
"""Sanity checks on the dataset reference (self._dataset).

Checks whether the dataset is an instance of SQLiteDataset,
ParquetDataset, or Dataset. Raises a TypeError if an invalid
dataset type is detected, or if an EnsembleDataset is used.
ParquetDataset, LMDBDataset, or Dataset. Raises a TypeError if
an invalid dataset type is detected, or if an EnsembleDataset is
used.
"""
allowed_types = (SQLiteDataset, ParquetDataset, Dataset)
allowed_types = (SQLiteDataset, ParquetDataset, LMDBDataset, Dataset)
if self._dataset not in allowed_types:
raise TypeError(
"dataset_reference must be an instance "
"of SQLiteDataset, ParquetDataset, or Dataset."
"of SQLiteDataset, ParquetDataset, LMDBDataset, or Dataset."
)
if self._dataset is EnsembleDataset:
raise TypeError(
Expand Down
1 change: 1 addition & 0 deletions src/graphnet/data/dataset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from .parquet.parquet_dataset import ParquetDataset
from .sqlite.sqlite_dataset import SQLiteDataset
from .lmdb.lmdb_dataset import LMDBDataset

torch.multiprocessing.set_sharing_strategy("file_system")

Expand Down
Loading