Skip to content

Conversation

@jwrhw7tueydwtt7575g
Copy link

@jwrhw7tueydwtt7575g jwrhw7tueydwtt7575g commented Jan 11, 2026

dlt MCP Server Architecture Plan

Overview

Build a Python MCP server in mcp-servers/dlt-mcp/ that provides read-only tooling for AI agents to interact with local dlt pipelines. The server will use stdio transport and follow MCP SDK patterns.


1. Folder Structure

mcp-servers/
└── dlt-mcp/
    ├── pyproject.toml           # Package config, dependencies
    ├── README.md                # Documentation
    ├── src/
    │   └── dlt_mcp/
    │       ├── __init__.py
    │       ├── server.py        # MCP server entry point
    │       ├── tools/           # Tool implementations
    │       │   ├── __init__.py
    │       │   ├── pipeline_inspect.py
    │       │   ├── schema_metadata.py
    │       │   ├── query_data.py
    │       │   ├── error_analysis.py
    │       │   ├── pipeline_scaffold.py
    │       │   └── schema_evolution.py
    │       ├── utils/           # Shared utilities
    │       │   ├── __init__.py
    │       │   ├── pipeline_discovery.py
    │       │   ├── db_connector.py
    │       │   └── validation.py
    │       └── types.py         # Type definitions
    └── tests/
        ├── __init__.py
        ├── conftest.py          # Pytest fixtures
        ├── test_pipeline_inspect.py
        ├── test_schema_metadata.py
        ├── test_query_data.py
        └── test_error_analysis.py

2. Tool Definitions

Tool 1: inspect_pipeline

Purpose: Inspect dlt pipeline execution details

| Parameter | Type | Required | Description |

|-----------|------|----------|-------------|

| pipeline_name | string | No | Name of pipeline (auto-discovers if omitted) |

| working_dir | string | No | Directory to search for pipelines |

Output: JSON with load info, timing, file sizes, rows loaded, last run status


Tool 2: get_schema

Purpose: Retrieve pipeline schema metadata

| Parameter | Type | Required | Description |

|-----------|------|----------|-------------|

| pipeline_name | string | No | Pipeline name |

| table_name | string | No | Specific table (all tables if omitted) |

| include_hints | boolean | No | Include dlt hints/annotations |

Output: JSON with tables, columns, data types, primary keys, constraints


Tool 3: query_destination

Purpose: Query data from destination databases (DuckDB, PostgreSQL)

| Parameter | Type | Required | Description |

|-----------|------|----------|-------------|

| pipeline_name | string | No | Pipeline name |

| query | string | Yes | SQL SELECT query (read-only enforced) |

| limit | integer | No | Max rows to return (default: 100) |

Output: JSON with query results, column names, row count

Safety: Only SELECT statements allowed; query validation prevents mutations


Tool 4: analyze_errors

Purpose: Analyze pipeline load errors and explain root causes

| Parameter | Type | Required | Description |

|-----------|------|----------|-------------|

| pipeline_name | string | No | Pipeline name |

| last_n_runs | integer | No | Number of recent runs to analyze |

Output: JSON with error details, affected tables, suggested fixes, stack traces


Tool 5: scaffold_pipeline

Purpose: Generate scaffold code for a new dlt pipeline

| Parameter | Type | Required | Description |

|-----------|------|----------|-------------|

| source_type | string | Yes | Source type: "rest_api", "sql_database", "filesystem" |

| destination | string | Yes | Destination: "duckdb", "postgres", "bigquery", etc. |

| pipeline_name | string | Yes | Name for the new pipeline |

Output: JSON with generated code snippets, file structure, next steps


Tool 6: review_schema_evolution

Purpose: Review and summarize schema changes across pipeline runs

| Parameter | Type | Required | Description |

|-----------|------|----------|-------------|

| pipeline_name | string | No | Pipeline name |

| compare_runs | integer | No | Number of runs to compare |

Output: JSON with added/removed/modified columns, type changes, migration suggestions


3. Architecture Diagram

flowchart TB
    subgraph ContinueAgent [Continue Agent]
        AI[AI Model]
        MCP_Client[MCP Client]
    end

    subgraph DltMcpServer [dlt MCP Server - stdio]
        Server[server.py]
        Tools[Tool Handlers]
        Utils[Utilities]
    end

    subgraph LocalPipeline [Local dlt Pipeline]
        PipelineState[Pipeline State]
        Schema[Schema Files]
        Destination[(Destination DB)]
    end

    AI --> MCP_Client
    MCP_Client -->|stdio| Server
    Server --> Tools
    Tools --> Utils
    Utils --> PipelineState
    Utils --> Schema
    Utils --> Destination
Loading

4. Data Flow

  1. Tool Invocation: Continue agent calls MCP tool via stdio JSON-RPC

  2. Request Handling: server.py routes to appropriate tool handler

  3. Pipeline Discovery: pipeline_discovery.py locates dlt pipelines in working directory

  4. Data Access: Tool reads from:

                                                                                             - Pipeline state files (`.dlt/` directory)
                                                                                             - Schema metadata (stored in pipeline)
                                                                                             - Destination database (read-only queries)
    
  5. Response: Structured JSON returned to agent


5. Key Design Decisions

| Decision | Rationale |

|----------|-----------|

| stdio transport only | Simpler deployment; matches local-only scope |

| Read-only enforcement | Safety constraint; SQL validation layer |

| Auto-discovery | Better UX; finds pipelines in cwd automatically |

| dlt library direct usage | Official API ensures compatibility |

| Structured JSON output | Easy for AI to parse and reason about |

| Separate tool modules | Clean separation of concerns; testability |


6. Dependencies

[project]
dependencies = [
    "mcp>=1.0.0",           # MCP Python SDK
    "dlt>=0.5.0",           # dlt library
    "duckdb>=0.9.0",        # DuckDB support
    "psycopg2-binary",      # PostgreSQL support (optional)
    "pydantic>=2.0",        # Input validation
]

7. Testing Strategy

| Test Type | Scope | Tools |

|-----------|-------|-------|

| Unit Tests | Individual tool handlers | pytest, pytest-mock |

| Integration Tests | Full tool flow with mock pipelines | pytest, temp directories |

| Fixture Pipelines | Pre-built dlt pipelines for testing | conftest.py fixtures |

Test Coverage Goals:

  • Tool input validation (edge cases, invalid inputs)
  • Pipeline discovery (multiple pipelines, no pipelines)
  • Query safety (SQL injection prevention)
  • Error handling (missing pipelines, DB connection failures)

8. Documentation Outline

  1. README.md

                                                                                             - Quick start / installation
                                                                                             - Configuration (MCP server config YAML)
                                                                                             - Available tools with examples
                                                                                             - Supported destinations
    
  2. Tool Reference (in README or separate doc)

                                                                                             - Each tool: description, parameters, example output
                                                                                             - Error codes and troubleshooting
    

9. MCP Server Configuration Example

name: dlt MCP Server
version: 0.0.1
schema: v1
mcpServers:
 - name: dlt-mcp
    command: uvx
    args:
   - "dlt-mcp"

10. Implementation Order

  1. Project setup (pyproject.toml, folder structure)
  2. Core server skeleton with MCP SDK
  3. Pipeline discovery utility
  4. inspect_pipeline tool (foundational)
  5. get_schema tool
  6. query_destination tool with SQL validation
  7. analyze_errors tool
  8. scaffold_pipeline tool
  9. review_schema_evolution tool
  10. Tests and documentation

Continue Tasks

Status Task Actions
▶️ Queued Create GitHub Issue (OS) View

Powered by Continue


Summary by cubic

Implements a Python MCP server for dlt to inspect, query, and debug local pipelines over stdio with strict read-only safety. Adds six tools, destination connectors, docs, and tests to make pipeline state and schema accessible to MCP clients.

  • New Features

    • MCP server with stdio transport and CLI script: dlt-mcp
    • Tools: inspect_pipeline, get_schema, query_destination (SELECT-only), analyze_errors, scaffold_pipeline, review_schema_evolution
    • Pipeline discovery utilities and input validation
    • DuckDB and PostgreSQL connectors with query safety checks
    • README with examples and a comprehensive test suite
  • Dependencies

    • Runtime: mcp, dlt, duckdb, psycopg2-binary, pydantic
    • Dev: pytest, pytest-mock, pytest-asyncio, black, ruff

Written for commit f82fff9. Summary will update on new commits.

- Implement MCP server for dlt pipeline inspection and debugging
- Add 6 tools: inspect_pipeline, get_schema, query_destination, analyze_errors, scaffold_pipeline, review_schema_evolution
- Include pipeline discovery utilities and database connectors
- Add comprehensive test suite (26 tests, all passing)
- Add README with documentation and examples
- Support DuckDB and PostgreSQL destinations
- Read-only SQL query validation for safety
@jwrhw7tueydwtt7575g jwrhw7tueydwtt7575g requested a review from a team as a code owner January 11, 2026 09:30
@jwrhw7tueydwtt7575g jwrhw7tueydwtt7575g requested review from sestinj and removed request for a team January 11, 2026 09:30
@continue
Copy link
Contributor

continue bot commented Jan 11, 2026

All Green - Keep your PRs mergeable

Learn more

All Green is an AI agent that automatically:

✅ Addresses code review comments

✅ Fixes failing CI checks

✅ Resolves merge conflicts


Unsubscribe from All Green comments

1 similar comment
@continue-staging
Copy link

All Green - Keep your PRs mergeable

Learn more

All Green is an AI agent that automatically:

✅ Addresses code review comments

✅ Fixes failing CI checks

✅ Resolves merge conflicts


Unsubscribe from All Green comments

@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Jan 11, 2026
@github-actions
Copy link


Thank you for your submission, we really appreciate it. Like many open-source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution. You can sign the CLA by just posting a Pull Request Comment same as the below format.


I have read the CLA Document and I hereby sign the CLA


You can retrigger this bot by commenting recheck in this Pull Request. Posted by the CLA Assistant Lite bot.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

14 issues found across 26 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py:42">
P1: Read-only SQL validation misses SELECT ... INTO; PostgreSQL table-creating SELECT passes validation and can perform writes.</violation>

<violation number="2" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py:97">
P1: Connection errors are printed to stdout in an stdio MCP server, potentially corrupting protocol output.</violation>

<violation number="3" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py:122">
P2: LIMIT safeguard can be bypassed via comments/strings and does not cap oversized LIMITs, allowing unbounded fetches</violation>
</file>

<file name="mcp-servers/dlt-mcp/pyproject.toml">

<violation number="1" location="mcp-servers/dlt-mcp/pyproject.toml:42">
P1: Console script targets async entrypoint; calling `dlt-mcp` will not run the server because the coroutine is never awaited.</violation>
</file>

<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py:56">
P2: Schema evolution always reports all current columns as added because previous schema is never loaded and defaults to empty, preventing the no-history fallback.</violation>

<violation number="2" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py:77">
P2: Computed migration_suggestions is unused and never returned, so the tool fails to deliver promised migration suggestions</violation>

<violation number="3" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py:92">
P2: Swallowing all exceptions hides schema processing failures and can return empty/partial evolution data instead of reporting the error.</violation>
</file>

<file name="mcp-servers/dlt-mcp/README.md">

<violation number="1" location="mcp-servers/dlt-mcp/README.md:39">
P2: Installation instructions reference non-existent requirements.txt, so the documented command will fail.</violation>
</file>

<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py:76">
P2: Exceptions during load inspection are swallowed, returning partial/misleading results without any error signal.</violation>
</file>

<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py:253">
P2: Python 3.8 target but uses 3.9-only `list[str]` annotation; module import will fail on 3.8</violation>
</file>

<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py:73">
P2: Exceptions during error log parsing are swallowed, causing failed scans to return misleading "no_errors_found" and hiding real failures.</violation>
</file>

<file name="mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py:35">
P1: Read-only SQL validation is bypassable: multi-statement queries with unblocked commands (e.g., `COPY`, `VACUUM`) pass `validate_sql_query` and would execute despite the read-only check.</violation>

<violation number="2" location="mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py:90">
P2: Raw exception details are returned to clients, potentially leaking driver connection info and file paths.</violation>
</file>

<file name="mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py">

<violation number="1" location="mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py:30">
P2: Pipeline name validation permits `.`/`..`, enabling path traversal when names are joined into pipeline directories.</violation>
</file>

Since this is your first cubic review, here's how it works:

  • cubic automatically reviews your code and comments on bugs and improvements
  • Teach cubic by replying to its comments. cubic learns from your replies and gets better over time
  • Ask questions if you need clarification on any suggestion

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

return False, f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed."

# Must start with SELECT
if not query_upper.startswith('SELECT'):
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Read-only SQL validation misses SELECT ... INTO; PostgreSQL table-creating SELECT passes validation and can perform writes.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py, line 42:

<comment>Read-only SQL validation misses SELECT ... INTO; PostgreSQL table-creating SELECT passes validation and can perform writes.</comment>

<file context>
@@ -0,0 +1,161 @@
+            return False, f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed."
+
+    # Must start with SELECT
+    if not query_upper.startswith('SELECT'):
+        return False, "Query must be a SELECT statement."
+
</file context>
Fix with Cubic


except Exception as e:
# Log error but don't raise - return None to indicate failure
print(f"Failed to connect to destination: {e}")
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Connection errors are printed to stdout in an stdio MCP server, potentially corrupting protocol output.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py, line 97:

<comment>Connection errors are printed to stdout in an stdio MCP server, potentially corrupting protocol output.</comment>

<file context>
@@ -0,0 +1,161 @@
+
+    except Exception as e:
+        # Log error but don't raise - return None to indicate failure
+        print(f"Failed to connect to destination: {e}")
+        return None
+
</file context>
Fix with Cubic

]

[project.scripts]
dlt-mcp = "dlt_mcp.server:main"
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Console script targets async entrypoint; calling dlt-mcp will not run the server because the coroutine is never awaited.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/pyproject.toml, line 42:

<comment>Console script targets async entrypoint; calling `dlt-mcp` will not run the server because the coroutine is never awaited.</comment>

<file context>
@@ -0,0 +1,65 @@
+]
+
+[project.scripts]
+dlt-mcp = "dlt_mcp.server:main"
+
+[build-system]
</file context>
Fix with Cubic

}

# Validate query is read-only
is_valid, error = validate_sql_query(query)
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Read-only SQL validation is bypassable: multi-statement queries with unblocked commands (e.g., COPY, VACUUM) pass validate_sql_query and would execute despite the read-only check.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py, line 35:

<comment>Read-only SQL validation is bypassable: multi-statement queries with unblocked commands (e.g., `COPY`, `VACUUM`) pass `validate_sql_query` and would execute despite the read-only check.</comment>

<file context>
@@ -0,0 +1,107 @@
+        }
+
+    # Validate query is read-only
+    is_valid, error = validate_sql_query(query)
+    if not is_valid:
+        return {
</file context>
Fix with Cubic


# Add LIMIT if not present and limit is specified
query_upper = query.upper()
if limit > 0 and 'LIMIT' not in query_upper:
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: LIMIT safeguard can be bypassed via comments/strings and does not cap oversized LIMITs, allowing unbounded fetches

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py, line 122:

<comment>LIMIT safeguard can be bypassed via comments/strings and does not cap oversized LIMITs, allowing unbounded fetches</comment>

<file context>
@@ -0,0 +1,161 @@
+
+    # Add LIMIT if not present and limit is specified
+    query_upper = query.upper()
+    if limit > 0 and 'LIMIT' not in query_upper:
+        query = f"{query.rstrip(';')} LIMIT {limit}"
+
</file context>
Fix with Cubic

finish = datetime.fromisoformat(load_data["finished_at"].replace('Z', '+00:00'))
duration = (finish - start).total_seconds()
load_data["duration_seconds"] = duration
except Exception:
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Exceptions during load inspection are swallowed, returning partial/misleading results without any error signal.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py, line 76:

<comment>Exceptions during load inspection are swallowed, returning partial/misleading results without any error signal.</comment>

<file context>
@@ -0,0 +1,141 @@
+                                    finish = datetime.fromisoformat(load_data["finished_at"].replace('Z', '+00:00'))
+                                    duration = (finish - start).total_seconds()
+                                    load_data["duration_seconds"] = duration
+                                except Exception:
+                                    pass
+
</file context>
Fix with Cubic

}


def _get_next_steps(source_type: str, destination: str, pipeline_name: str) -> list[str]:
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Python 3.8 target but uses 3.9-only list[str] annotation; module import will fail on 3.8

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py, line 253:

<comment>Python 3.8 target but uses 3.9-only `list[str]` annotation; module import will fail on 3.8</comment>

<file context>
@@ -0,0 +1,268 @@
+    }
+
+
+def _get_next_steps(source_type: str, destination: str, pipeline_name: str) -> list[str]:
+    """Get next steps for setting up the pipeline."""
+    steps = [
</file context>
Fix with Cubic

stack_trace=stack_trace,
suggested_fix=_suggest_fix(error_type, message)
))
except Exception:
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Exceptions during error log parsing are swallowed, causing failed scans to return misleading "no_errors_found" and hiding real failures.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py, line 73:

<comment>Exceptions during error log parsing are swallowed, causing failed scans to return misleading "no_errors_found" and hiding real failures.</comment>

<file context>
@@ -0,0 +1,150 @@
+                            stack_trace=stack_trace,
+                            suggested_fix=_suggest_fix(error_type, message)
+                        ))
+                    except Exception:
+                        pass
+        except Exception:
</file context>
Fix with Cubic

except ValueError as e:
# Query validation error
return {
"error": str(e),
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Raw exception details are returned to clients, potentially leaking driver connection info and file paths.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py, line 90:

<comment>Raw exception details are returned to clients, potentially leaking driver connection info and file paths.</comment>

<file context>
@@ -0,0 +1,107 @@
+    except ValueError as e:
+        # Query validation error
+        return {
+            "error": str(e),
+            "error_type": "ValidationError",
+            "query": query,
</file context>
Fix with Cubic

if not pipeline_name:
return None

return pipeline_name
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Pipeline name validation permits ./.., enabling path traversal when names are joined into pipeline directories.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py, line 30:

<comment>Pipeline name validation permits `.`/`..`, enabling path traversal when names are joined into pipeline directories.</comment>

<file context>
@@ -0,0 +1,54 @@
+    if not pipeline_name:
+        return None
+
+    return pipeline_name
+
+
</file context>
Fix with Cubic

@jwrhw7tueydwtt7575g
Copy link
Author

Heyy Review its's once it is not about dlt MCP problemn it is about environment/network-related one Please review code once fixed it and merged my branch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

Status: Todo

Development

Successfully merging this pull request may close these issues.

1 participant