Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/build-run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.x'
python-version: '3.12'

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
enable-cache: true

- name: Run Data Loader Tests
working-directory: integrations/python/dataloader
run: make sync all

- name: Install dependencies
run: pip install -r scripts/python/requirements.txt
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ hs_err_pid*
*.iws
.idea

# VS Code
.vscode/

# LinkedIn / Gradle / Hardware
.cache
.gradle
Expand Down
25 changes: 25 additions & 0 deletions integrations/python/dataloader/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Python
__pycache__/
*.py[cod]
*.so

# Virtual environments
.venv/
venv/

# Distribution / packaging
build/
dist/
*.egg-info/

# Testing
.pytest_cache/
.coverage
htmlcov/

# IDEs
.vscode/
.idea/

# OS
.DS_Store
54 changes: 54 additions & 0 deletions integrations/python/dataloader/CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Claude Instructions for OpenHouse DataLoader

## Project Overview

Python library for distributed data loading of OpenHouse tables. Uses DataFusion for query execution and PyIceberg for table access.

## Common Commands

```bash
make sync # Install dependencies
make check # Run lint + format checks
make test # Run tests
make all # Run all checks and tests
make format # Auto-format code
```

## Workflows
When making a change run `make all` to ensure all tests and checks pass

## Project Structure

```
src/openhouse/dataloader/
├── __init__.py # Public API exports
├── data_loader.py # Main API: OpenHouseDataLoader
├── data_loader_splits.py # DataLoaderSplits (iterable of splits)
├── data_loader_split.py # DataLoaderSplit (single callable split)
├── table_identifier.py # TableIdentifier dataclass
├── table_transformer.py # TableTransformer ABC (internal)
└── udf_registry.py # UDFRegistry ABC (internal)
```

## Public API

Only these are exported in `__init__.py`:
- `OpenHouseDataLoader` - Main entry point
- `TableIdentifier` - Table reference (database, table, branch)

Internal modules (TableTransformer, UDFRegistry) can be imported directly if needed but expose DataFusion types.

## Code Style

- Uses `ruff` for linting and formatting
- Line length: 120
- Python 3.12+
- Use modern type hints (`list`, `dict`, `X | None` instead of `List`, `Dict`, `Optional`)
- Use `raise NotImplementedError` for unimplemented methods in concrete classes
- Use `pass` for abstract methods decorated with `@abstractmethod`

## Versioning

- Version is in `pyproject.toml` (single source of truth)
- `__version__` in `__init__.py` reads from package metadata at runtime
- Major.minor aligns with OpenHouse monorepo, patch is independent
38 changes: 38 additions & 0 deletions integrations/python/dataloader/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
.PHONY: help sync clean lint format format-check check test all

help:
@echo "Available commands:"
@echo " make sync - Sync dependencies using uv"
@echo " make lint - Run ruff linter"
@echo " make format - Format code with ruff"
@echo " make check - Run all checks (lint + format check)"
@echo " make test - Run tests with pytest"
@echo " make all - Run all checks and tests"
@echo " make clean - Clean build artifacts"

sync:
uv sync --all-extras

lint:
uv run ruff check src/ tests/

format:
uv run ruff format src/ tests/

format-check:
uv run ruff format --check src/ tests/

check: lint format-check

test:
uv run pytest

all: check test

clean:
rm -rf build/
rm -rf dist/
rm -rf *.egg-info
rm -rf .venv/
find . -type d -name __pycache__ -exec rm -rf {} +
find . -type f -name "*.pyc" -delete
32 changes: 32 additions & 0 deletions integrations/python/dataloader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# OpenHouse DataLoader

A Python library for distributed data loading of OpenHouse tables

## Quickstart

```python
from openhouse.dataloader import OpenHouseDataLoader

loader = OpenHouseDataLoader("my_database", "my_table")

for split in loader:
# Get table properties
split.table_properties

# Load data
for batch in split:
process(batch)
```

## Development

```bash
# Set up environment
make sync

# Run tests
make test

# See all available commands
make help
```
20 changes: 20 additions & 0 deletions integrations/python/dataloader/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "openhouse-dataloader"
version = "0.0.1"
description = "A Python library for distributed data loading of OpenHouse tables"
readme = "README.md"
requires-python = ">=3.12"
dependencies = ["datafusion==51.0.0", "pyiceberg==0.10.0"]

[project.optional-dependencies]
dev = ["ruff>=0.9.0", "pytest>=8.0.0"]

[tool.ruff]
line-length = 120
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]

[tool.ruff.format]
quote-style = "double"
1 change: 1 addition & 0 deletions integrations/python/dataloader/src/openhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from importlib.metadata import version

from openhouse.dataloader.data_loader import DataLoaderContext, OpenHouseDataLoader

__version__ = version("openhouse-dataloader")
__all__ = ["OpenHouseDataLoader", "DataLoaderContext"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass

from openhouse.dataloader.data_loader_split import DataLoaderSplit
from openhouse.dataloader.table_identifier import TableIdentifier
from openhouse.dataloader.table_transformer import TableTransformer
from openhouse.dataloader.udf_registry import UDFRegistry


@dataclass
class DataLoaderContext:
"""Context and customization for the DataLoader.

Provides execution context (e.g. tenant, environment) and optional customizations
like table transformations applied before loading data.

Args:
execution_context: Dictionary of execution context information (e.g. tenant, environment)
table_transformer: Transformation to apply to the table before loading (e.g. column masking)
udf_registry: UDFs required for the table transformation
"""

execution_context: Mapping[str, str] | None = None
table_transformer: TableTransformer | None = None
udf_registry: UDFRegistry | None = None


class OpenHouseDataLoader:
"""An API for distributed data loading of OpenHouse tables"""

def __init__(
self,
database: str,
table: str,
branch: str | None = None,
columns: Sequence[str] | None = None,
context: DataLoaderContext | None = None,
):
"""
Args:
database: Database name
table: Table name
branch: Optional branch name
columns: Column names to load, or None to load all columns
context: Data loader context
"""
self._table = TableIdentifier(database, table, branch)
self._columns = columns
self._context = context or DataLoaderContext()

def __iter__(self) -> Iterable[DataLoaderSplit]:
"""Iterate over data splits for distributed data loading of the table.

Returns:
Iterable of DataLoaderSplit, each containing table_properties
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from collections.abc import Iterator, Mapping

from datafusion.plan import LogicalPlan
from pyarrow import RecordBatch
from pyiceberg.io import FileScanTask

from openhouse.dataloader.udf_registry import UDFRegistry


class DataLoaderSplit:
"""A single data split"""

def __init__(
self,
plan: LogicalPlan,
file_scan_task: FileScanTask,
udf_registry: UDFRegistry,
table_properties: Mapping[str, str],
):
self._plan = plan
self._file_scan_task = file_scan_task
self._udf_registry = udf_registry
self._table_properties = table_properties

@property
def table_properties(self) -> Mapping[str, str]:
"""Properties of the table being loaded"""
return self._table_properties

def __iter__(self) -> Iterator[RecordBatch]:
"""Loads the split data after applying, including applying a prerequisite
table transformation if provided

Returns:
An iterator for batches of data in the split
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass


@dataclass
class TableIdentifier:
"""Identifier for a table in OpenHouse

Args:
database: Database name
table: Table name
branch: Optional branch name
"""

database: str
table: str
branch: str | None = None

def __str__(self) -> str:
"""Return the fully qualified table name."""
base = f"{self.database}.{self.table}"
return f"{base}.{self.branch}" if self.branch else base
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from collections.abc import Mapping

from datafusion.context import SessionContext
from datafusion.dataframe import DataFrame

from openhouse.dataloader.table_identifier import TableIdentifier


class TableTransformer(ABC):
"""Interface for applying additional transformation logic to the data
being loaded (e.g. column masking, row filtering)
"""

@abstractmethod
def transform(
self, session_context: SessionContext, table: TableIdentifier, context: Mapping[str, str]
) -> DataFrame | None:
"""Applies transformation logic to the base table that is being loaded.

Args:
table: Identifier for the table
context: Dictionary of context information (e.g. tenant, environment, etc.)

Returns:
The DataFrame representing the transformation. This is expected to read from the exact
base table identifier passed in as input. If no transformation is required, None is returned.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod

from datafusion.context import SessionContext


class UDFRegistry(ABC):
"""Used to register DataFusion UDFs"""

@abstractmethod
def register_udfs(self, session_context: SessionContext) -> None:
"""Registers UDFs with DataFusion

Args:
session_context: The session context to register the UDFs in
"""
pass
Empty file.
3 changes: 3 additions & 0 deletions integrations/python/dataloader/tests/test_data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test_data_loader():
"""Test placeholder until real tests are added"""
pass
Loading