-
Notifications
You must be signed in to change notification settings - Fork 4k
Add dlt MCP server implementation #9481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add dlt MCP server implementation #9481
Conversation
- Implement MCP server for dlt pipeline inspection and debugging - Add 6 tools: inspect_pipeline, get_schema, query_destination, analyze_errors, scaffold_pipeline, review_schema_evolution - Include pipeline discovery utilities and database connectors - Add comprehensive test suite (26 tests, all passing) - Add README with documentation and examples - Support DuckDB and PostgreSQL destinations - Read-only SQL query validation for safety
Learn moreAll Green is an AI agent that automatically: ✅ Addresses code review comments ✅ Fixes failing CI checks ✅ Resolves merge conflicts |
1 similar comment
Learn moreAll Green is an AI agent that automatically: ✅ Addresses code review comments ✅ Fixes failing CI checks ✅ Resolves merge conflicts |
|
I have read the CLA Document and I hereby sign the CLA You can retrigger this bot by commenting recheck in this Pull Request. Posted by the CLA Assistant Lite bot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
14 issues found across 26 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py:42">
P1: Read-only SQL validation misses SELECT ... INTO; PostgreSQL table-creating SELECT passes validation and can perform writes.</violation>
<violation number="2" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py:97">
P1: Connection errors are printed to stdout in an stdio MCP server, potentially corrupting protocol output.</violation>
<violation number="3" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py:122">
P2: LIMIT safeguard can be bypassed via comments/strings and does not cap oversized LIMITs, allowing unbounded fetches</violation>
</file>
<file name="mcp-servers/dlt-mcp/pyproject.toml">
<violation number="1" location="mcp-servers/dlt-mcp/pyproject.toml:42">
P1: Console script targets async entrypoint; calling `dlt-mcp` will not run the server because the coroutine is never awaited.</violation>
</file>
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py:56">
P2: Schema evolution always reports all current columns as added because previous schema is never loaded and defaults to empty, preventing the no-history fallback.</violation>
<violation number="2" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py:77">
P2: Computed migration_suggestions is unused and never returned, so the tool fails to deliver promised migration suggestions</violation>
<violation number="3" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py:92">
P2: Swallowing all exceptions hides schema processing failures and can return empty/partial evolution data instead of reporting the error.</violation>
</file>
<file name="mcp-servers/dlt-mcp/README.md">
<violation number="1" location="mcp-servers/dlt-mcp/README.md:39">
P2: Installation instructions reference non-existent requirements.txt, so the documented command will fail.</violation>
</file>
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py:76">
P2: Exceptions during load inspection are swallowed, returning partial/misleading results without any error signal.</violation>
</file>
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py:253">
P2: Python 3.8 target but uses 3.9-only `list[str]` annotation; module import will fail on 3.8</violation>
</file>
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py:73">
P2: Exceptions during error log parsing are swallowed, causing failed scans to return misleading "no_errors_found" and hiding real failures.</violation>
</file>
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py:35">
P1: Read-only SQL validation is bypassable: multi-statement queries with unblocked commands (e.g., `COPY`, `VACUUM`) pass `validate_sql_query` and would execute despite the read-only check.</violation>
<violation number="2" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py:90">
P2: Raw exception details are returned to clients, potentially leaking driver connection info and file paths.</violation>
</file>
<file name="mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py">
<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py:30">
P2: Pipeline name validation permits `.`/`..`, enabling path traversal when names are joined into pipeline directories.</violation>
</file>
Since this is your first cubic review, here's how it works:
- cubic automatically reviews your code and comments on bugs and improvements
- Teach cubic by replying to its comments. cubic learns from your replies and gets better over time
- Ask questions if you need clarification on any suggestion
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| return False, f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed." | ||
|
|
||
| # Must start with SELECT | ||
| if not query_upper.startswith('SELECT'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Read-only SQL validation misses SELECT ... INTO; PostgreSQL table-creating SELECT passes validation and can perform writes.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py, line 42:
<comment>Read-only SQL validation misses SELECT ... INTO; PostgreSQL table-creating SELECT passes validation and can perform writes.</comment>
<file context>
@@ -0,0 +1,161 @@
+ return False, f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed."
+
+ # Must start with SELECT
+ if not query_upper.startswith('SELECT'):
+ return False, "Query must be a SELECT statement."
+
</file context>
|
|
||
| except Exception as e: | ||
| # Log error but don't raise - return None to indicate failure | ||
| print(f"Failed to connect to destination: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Connection errors are printed to stdout in an stdio MCP server, potentially corrupting protocol output.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py, line 97:
<comment>Connection errors are printed to stdout in an stdio MCP server, potentially corrupting protocol output.</comment>
<file context>
@@ -0,0 +1,161 @@
+
+ except Exception as e:
+ # Log error but don't raise - return None to indicate failure
+ print(f"Failed to connect to destination: {e}")
+ return None
+
</file context>
| ] | ||
|
|
||
| [project.scripts] | ||
| dlt-mcp = "dlt_mcp.server:main" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Console script targets async entrypoint; calling dlt-mcp will not run the server because the coroutine is never awaited.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/pyproject.toml, line 42:
<comment>Console script targets async entrypoint; calling `dlt-mcp` will not run the server because the coroutine is never awaited.</comment>
<file context>
@@ -0,0 +1,65 @@
+]
+
+[project.scripts]
+dlt-mcp = "dlt_mcp.server:main"
+
+[build-system]
</file context>
| } | ||
|
|
||
| # Validate query is read-only | ||
| is_valid, error = validate_sql_query(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Read-only SQL validation is bypassable: multi-statement queries with unblocked commands (e.g., COPY, VACUUM) pass validate_sql_query and would execute despite the read-only check.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py, line 35:
<comment>Read-only SQL validation is bypassable: multi-statement queries with unblocked commands (e.g., `COPY`, `VACUUM`) pass `validate_sql_query` and would execute despite the read-only check.</comment>
<file context>
@@ -0,0 +1,107 @@
+ }
+
+ # Validate query is read-only
+ is_valid, error = validate_sql_query(query)
+ if not is_valid:
+ return {
</file context>
|
|
||
| # Add LIMIT if not present and limit is specified | ||
| query_upper = query.upper() | ||
| if limit > 0 and 'LIMIT' not in query_upper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: LIMIT safeguard can be bypassed via comments/strings and does not cap oversized LIMITs, allowing unbounded fetches
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py, line 122:
<comment>LIMIT safeguard can be bypassed via comments/strings and does not cap oversized LIMITs, allowing unbounded fetches</comment>
<file context>
@@ -0,0 +1,161 @@
+
+ # Add LIMIT if not present and limit is specified
+ query_upper = query.upper()
+ if limit > 0 and 'LIMIT' not in query_upper:
+ query = f"{query.rstrip(';')} LIMIT {limit}"
+
</file context>
| finish = datetime.fromisoformat(load_data["finished_at"].replace('Z', '+00:00')) | ||
| duration = (finish - start).total_seconds() | ||
| load_data["duration_seconds"] = duration | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Exceptions during load inspection are swallowed, returning partial/misleading results without any error signal.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py, line 76:
<comment>Exceptions during load inspection are swallowed, returning partial/misleading results without any error signal.</comment>
<file context>
@@ -0,0 +1,141 @@
+ finish = datetime.fromisoformat(load_data["finished_at"].replace('Z', '+00:00'))
+ duration = (finish - start).total_seconds()
+ load_data["duration_seconds"] = duration
+ except Exception:
+ pass
+
</file context>
| } | ||
|
|
||
|
|
||
| def _get_next_steps(source_type: str, destination: str, pipeline_name: str) -> list[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Python 3.8 target but uses 3.9-only list[str] annotation; module import will fail on 3.8
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py, line 253:
<comment>Python 3.8 target but uses 3.9-only `list[str]` annotation; module import will fail on 3.8</comment>
<file context>
@@ -0,0 +1,268 @@
+ }
+
+
+def _get_next_steps(source_type: str, destination: str, pipeline_name: str) -> list[str]:
+ """Get next steps for setting up the pipeline."""
+ steps = [
</file context>
| stack_trace=stack_trace, | ||
| suggested_fix=_suggest_fix(error_type, message) | ||
| )) | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Exceptions during error log parsing are swallowed, causing failed scans to return misleading "no_errors_found" and hiding real failures.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py, line 73:
<comment>Exceptions during error log parsing are swallowed, causing failed scans to return misleading "no_errors_found" and hiding real failures.</comment>
<file context>
@@ -0,0 +1,150 @@
+ stack_trace=stack_trace,
+ suggested_fix=_suggest_fix(error_type, message)
+ ))
+ except Exception:
+ pass
+ except Exception:
</file context>
| except ValueError as e: | ||
| # Query validation error | ||
| return { | ||
| "error": str(e), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Raw exception details are returned to clients, potentially leaking driver connection info and file paths.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py, line 90:
<comment>Raw exception details are returned to clients, potentially leaking driver connection info and file paths.</comment>
<file context>
@@ -0,0 +1,107 @@
+ except ValueError as e:
+ # Query validation error
+ return {
+ "error": str(e),
+ "error_type": "ValidationError",
+ "query": query,
</file context>
| if not pipeline_name: | ||
| return None | ||
|
|
||
| return pipeline_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Pipeline name validation permits ./.., enabling path traversal when names are joined into pipeline directories.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py, line 30:
<comment>Pipeline name validation permits `.`/`..`, enabling path traversal when names are joined into pipeline directories.</comment>
<file context>
@@ -0,0 +1,54 @@
+ if not pipeline_name:
+ return None
+
+ return pipeline_name
+
+
</file context>
|
Heyy Review its's once it is not about dlt MCP problemn it is about environment/network-related one Please review code once fixed it and merged my branch |

dlt MCP Server Architecture Plan
Overview
Build a Python MCP server in
mcp-servers/dlt-mcp/that provides read-only tooling for AI agents to interact with local dlt pipelines. The server will use stdio transport and follow MCP SDK patterns.1. Folder Structure
2. Tool Definitions
Tool 1:
inspect_pipelinePurpose: Inspect dlt pipeline execution details
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
|
pipeline_name| string | No | Name of pipeline (auto-discovers if omitted) ||
working_dir| string | No | Directory to search for pipelines |Output: JSON with load info, timing, file sizes, rows loaded, last run status
Tool 2:
get_schemaPurpose: Retrieve pipeline schema metadata
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
|
pipeline_name| string | No | Pipeline name ||
table_name| string | No | Specific table (all tables if omitted) ||
include_hints| boolean | No | Include dlt hints/annotations |Output: JSON with tables, columns, data types, primary keys, constraints
Tool 3:
query_destinationPurpose: Query data from destination databases (DuckDB, PostgreSQL)
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
|
pipeline_name| string | No | Pipeline name ||
query| string | Yes | SQL SELECT query (read-only enforced) ||
limit| integer | No | Max rows to return (default: 100) |Output: JSON with query results, column names, row count
Safety: Only SELECT statements allowed; query validation prevents mutations
Tool 4:
analyze_errorsPurpose: Analyze pipeline load errors and explain root causes
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
|
pipeline_name| string | No | Pipeline name ||
last_n_runs| integer | No | Number of recent runs to analyze |Output: JSON with error details, affected tables, suggested fixes, stack traces
Tool 5:
scaffold_pipelinePurpose: Generate scaffold code for a new dlt pipeline
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
|
source_type| string | Yes | Source type: "rest_api", "sql_database", "filesystem" ||
destination| string | Yes | Destination: "duckdb", "postgres", "bigquery", etc. ||
pipeline_name| string | Yes | Name for the new pipeline |Output: JSON with generated code snippets, file structure, next steps
Tool 6:
review_schema_evolutionPurpose: Review and summarize schema changes across pipeline runs
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
|
pipeline_name| string | No | Pipeline name ||
compare_runs| integer | No | Number of runs to compare |Output: JSON with added/removed/modified columns, type changes, migration suggestions
3. Architecture Diagram
flowchart TB subgraph ContinueAgent [Continue Agent] AI[AI Model] MCP_Client[MCP Client] end subgraph DltMcpServer [dlt MCP Server - stdio] Server[server.py] Tools[Tool Handlers] Utils[Utilities] end subgraph LocalPipeline [Local dlt Pipeline] PipelineState[Pipeline State] Schema[Schema Files] Destination[(Destination DB)] end AI --> MCP_Client MCP_Client -->|stdio| Server Server --> Tools Tools --> Utils Utils --> PipelineState Utils --> Schema Utils --> Destination4. Data Flow
Tool Invocation: Continue agent calls MCP tool via stdio JSON-RPC
Request Handling:
server.pyroutes to appropriate tool handlerPipeline Discovery:
pipeline_discovery.pylocates dlt pipelines in working directoryData Access: Tool reads from:
Response: Structured JSON returned to agent
5. Key Design Decisions
| Decision | Rationale |
|----------|-----------|
| stdio transport only | Simpler deployment; matches local-only scope |
| Read-only enforcement | Safety constraint; SQL validation layer |
| Auto-discovery | Better UX; finds pipelines in cwd automatically |
| dlt library direct usage | Official API ensures compatibility |
| Structured JSON output | Easy for AI to parse and reason about |
| Separate tool modules | Clean separation of concerns; testability |
6. Dependencies
7. Testing Strategy
| Test Type | Scope | Tools |
|-----------|-------|-------|
| Unit Tests | Individual tool handlers | pytest, pytest-mock |
| Integration Tests | Full tool flow with mock pipelines | pytest, temp directories |
| Fixture Pipelines | Pre-built dlt pipelines for testing | conftest.py fixtures |
Test Coverage Goals:
8. Documentation Outline
README.md
Tool Reference (in README or separate doc)
9. MCP Server Configuration Example
10. Implementation Order
inspect_pipelinetool (foundational)get_schematoolquery_destinationtool with SQL validationanalyze_errorstoolscaffold_pipelinetoolreview_schema_evolutiontoolContinue Tasks
Powered by Continue
Summary by cubic
Implements a Python MCP server for dlt to inspect, query, and debug local pipelines over stdio with strict read-only safety. Adds six tools, destination connectors, docs, and tests to make pipeline state and schema accessible to MCP clients.
New Features
Dependencies
Written for commit f82fff9. Summary will update on new commits.