Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
fce00d7
Init Commit for Profiler Test Connection
sundarshankar89 Dec 11, 2025
4c750e1
Added an implementation for test connection
sundarshankar89 Dec 11, 2025
dcc324d
Added synapse connection support
sundarshankar89 Dec 30, 2025
94f048e
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Dec 30, 2025
0554147
Added synapse connection ODBC error capturing
sundarshankar89 Jan 2, 2026
dd152fc
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Jan 6, 2026
d6e396b
fmt fixes
sundarshankar89 Jan 6, 2026
5d6b50c
fmt fixes
sundarshankar89 Jan 6, 2026
ecdd54f
fmt fixes
sundarshankar89 Jan 6, 2026
8287803
fmt fixes
sundarshankar89 Jan 6, 2026
772eb35
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Jan 15, 2026
736f56f
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Jan 16, 2026
acbc057
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Jan 23, 2026
5915661
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Jan 28, 2026
c33e8fc
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Feb 2, 2026
516a140
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Feb 5, 2026
31f7b74
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Feb 8, 2026
fd8d501
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Feb 11, 2026
ccd660c
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Feb 11, 2026
b7615d4
Merge branch 'main' into feature/add_test_connection_entry_point
sundarshankar89 Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,9 @@ commands:
flags:
- name: source-tech
description: (Optional) The technology/platform of the sources to Profile

- name: test-profiler-connection
description: (Internal) Test the connection to the source database for profiling
flags:
- name: source-tech
description: The technology/platform of the source to test connection
82 changes: 81 additions & 1 deletion src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@

from databricks.labs.lakebridge.config import TranspileConfig, LSPConfigOptionV1
from databricks.labs.lakebridge.contexts.application import ApplicationContext
from databricks.labs.lakebridge.connections.credential_manager import cred_file
from databricks.labs.lakebridge.connections.credential_manager import cred_file, create_credential_manager
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager
from databricks.labs.lakebridge.connections.env_getter import EnvGetter
from databricks.labs.lakebridge.connections.synapse_helpers import validate_synapse_pools
from databricks.labs.lakebridge.helpers.recon_config_utils import ReconConfigPrompts
from databricks.labs.lakebridge.helpers.telemetry_utils import make_alphanum_or_semver
from databricks.labs.lakebridge.reconcile.runner import ReconcileRunner
Expand Down Expand Up @@ -1037,5 +1040,82 @@ def create_profiler_dashboard(
ctx.dashboard_manager.create_profiler_summary_dashboard(source_tech, catalog_name, schema_name)


def _transform_profiler_credentials(source_tech: str, raw_config: dict) -> dict:
"""Transform source-specific credential structures to flat connection config."""
if source_tech == "synapse":
# Synapse has nested structure: extract workspace config and add database
workspace_config = raw_config.get("workspace", {})
jdbc_config = raw_config.get("jdbc", {})

return {
**workspace_config,
"database": "master", # Use master database for connection test
"auth_type": jdbc_config.get("auth_type", "sql_authentication"),
}
return raw_config


@lakebridge.command()
def test_profiler_connection(w: WorkspaceClient, source_tech: str | None = None) -> None:
"""[Internal] Test the connection to the source database for profiling"""
ctx = ApplicationContext(w)
ctx.add_user_agent_extra("cmd", "test-profiler-connection")
prompts = ctx.prompts

source_tech = (
source_tech.lower()
if source_tech
else prompts.choice("Select the source technology", PROFILER_SOURCE_SYSTEM).lower()
)

if source_tech not in PROFILER_SOURCE_SYSTEM:
logger.error(f"Only the following source systems are supported: {PROFILER_SOURCE_SYSTEM}")
raise_validation_exception(f"Invalid source technology {source_tech}")

ctx.add_user_agent_extra("profiler_source_tech", make_alphanum_or_semver(source_tech))
logger.debug(f"User: {ctx.current_user}")

# Check if credential file exists
if not cred_file(PRODUCT_NAME).exists():
raise_validation_exception(
f"Connection details not found. Please run `databricks labs lakebridge configure-database-profiler` "
f"to set up connection details for {source_tech}."
)

logger.info(f"Testing connection for source technology: {source_tech}")

cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter())

try:
raw_config = cred_manager.get_credentials(source_tech)
except KeyError as e:
logger.error(f"Credential configuration error: {e}")
raise SystemExit(
f"Invalid credentials for {source_tech}. Please run `databricks labs lakebridge configure-database-profiler`. Exiting..."
) from e

# Validate connection for other source technologies
config = _transform_profiler_credentials(source_tech, raw_config)
db_manager = DatabaseManager(source_tech, config)

try:
# Handle synapse-specific validation
if source_tech == "synapse":
validate_synapse_pools(raw_config)
logger.info("Connection to the source system successful")
return
response = db_manager.check_connection()
logger.debug(f"Connection response: {response}")
logger.info("Connection to the source system successful")
except ConnectionError as e:
logger.error(f"Failed to connect to the source system: {e}")
if "IM002" in str(e) or "ODBC driver not found" in str(e):
raise SystemExit("Missing ODBC driver, Please install pre-req. Exiting...") from e
raise SystemExit("Connection validation failed. Exiting...") from e
except Exception as e:
logger.error(f"Unexpected error during connection test: {e}")
raise SystemExit("Connection test failed. Exiting...") from e


if __name__ == "__main__":
lakebridge()
40 changes: 37 additions & 3 deletions src/databricks/labs/lakebridge/connections/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector
"snowflake": SnowflakeConnector,
"mssql": MSSQLConnector,
"tsql": MSSQLConnector,
"synapse": SynapseConnector,
}

connector_class = connectors.get(db_type.lower())
Expand Down Expand Up @@ -105,16 +106,49 @@ def _connect(self) -> Engine:
return create_engine(connection_string)


class SynapseConnector(MSSQLConnector):
"""
Azure Synapse SQL Pool connector.

This is an adapter that translates Synapse-specific configuration
to MSSQL connection format, then delegates to MSSQLConnector.

Synapse SQL pools use the same protocol as SQL Server, so this
connector inherits from MSSQLConnector and only transforms the config.
"""

def __init__(self, config: dict[str, Any]):
# Synapse config may have endpoint_key for dedicated/serverless pools
# Transform to MSSQL-compatible config
endpoint_key = config.get('endpoint_key', 'dedicated_sql_endpoint')

# Build MSSQL-compatible configuration
mssql_config = {
'driver': config['driver'],
'server': config.get('server') or config.get(endpoint_key, config.get('dedicated_sql_endpoint')),
'database': config['database'],
'user': config.get('user') or config.get('sql_user'),
'password': config.get('password') or config.get('sql_password'),
'port': config.get('port', 1433),
'auth_type': config.get('auth_type', 'sql_authentication'),
}

# Initialize parent with transformed config
# This will call MSSQLConnector.__init__ which calls _connect()
super().__init__(mssql_config)


class DatabaseManager:
def __init__(self, db_type: str, config: dict[str, Any]):
self.connector = _create_connector(db_type, config)

def fetch(self, query: str) -> FetchResult:
try:
return self.connector.fetch(query)
except OperationalError:
logger.error("Error connecting to the database check credentials")
raise ConnectionError("Error connecting to the database check credentials") from None
except OperationalError as e:
error_msg = f"Error connecting to the database: {e}"
logger.error(error_msg)
raise ConnectionError(error_msg) from e

def check_connection(self) -> bool:
query = "SELECT 101 AS test_column"
Expand Down
97 changes: 97 additions & 0 deletions src/databricks/labs/lakebridge/connections/synapse_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager

logger = logging.getLogger(__name__)


def _test_pool_connection(pool_name: str, base_config: dict, endpoint_key: str) -> tuple[bool, str | None]:
"""Test connection to a single Synapse SQL pool with proper resource cleanup.

Returns:
Tuple of (success, error_message). error_message is None if successful.
"""
logger.info(f"Testing connection to {pool_name} SQL pool...")
pool_config = {**base_config, "endpoint_key": endpoint_key}
db_manager = None

try:
db_manager = DatabaseManager("synapse", pool_config)
if db_manager.check_connection():
logger.info(f"✓ {pool_name.capitalize()} SQL pool connection successful")
return True, None
logger.error(f"✗ {pool_name.capitalize()} SQL pool connection failed")
return False, f"{pool_name.capitalize()} SQL pool connection check failed"
except Exception as e: # pylint: disable=broad-exception-caught
# Catch all exceptions to gracefully handle any connection failure (network, auth, config, etc.)
error_msg = f"Failed to connect to {pool_name} SQL pool: {e}"
logger.error(f"✗ {error_msg}")
return False, error_msg
finally:
# Clean up database engine resources
if db_manager and hasattr(db_manager, 'connector') and hasattr(db_manager.connector, 'engine'):
db_manager.connector.engine.dispose()
logger.debug(f"Disposed engine for {pool_name} SQL pool")


def validate_synapse_pools(raw_config: dict) -> None:
"""
Validate connections to enabled Synapse SQL pools based on profiler configuration.
Each connection is properly cleaned up after testing to prevent resource leaks.

Example:
>>> config = {
... 'workspace': {
... 'dedicated_sql_endpoint': 'workspace.sql.azuresynapse.net',
... 'serverless_sql_endpoint': 'workspace-ondemand.sql.azuresynapse.net',
... 'sql_user': 'admin',
... 'sql_password': 'pass',
... 'driver': 'ODBC Driver 18 for SQL Server',
... },
... 'jdbc': {'auth_type': 'sql_authentication'},
... 'profiler': {'exclude_serverless_sql_pool': False},
... }
>>> validate_synapse_pools(config) # Tests both pools
"""
workspace_config = raw_config.get("workspace", {})
jdbc_config = raw_config.get("jdbc", {})
profiler_config = raw_config.get("profiler", {})

auth_type = jdbc_config.get("auth_type", "sql_authentication")

# Build base config shared by all pools
base_config = {
**workspace_config,
"database": "master",
"auth_type": auth_type,
}

# Determine which pools to test
test_dedicated = not profiler_config.get("exclude_dedicated_sql_pools", False)
test_serverless = not profiler_config.get("exclude_serverless_sql_pool", False)

if not test_dedicated and not test_serverless:
logger.warning("Both dedicated and serverless SQL pools are excluded in profiler configuration")
raise ValueError("No SQL pools enabled for testing")

# Track results and error messages
results = {}
error_messages = {}

# Test enabled pools sequentially
if test_dedicated:
success, error_msg = _test_pool_connection("dedicated", base_config, "dedicated_sql_endpoint")
results["dedicated"] = success
if error_msg:
error_messages["dedicated"] = error_msg

if test_serverless:
success, error_msg = _test_pool_connection("serverless", base_config, "serverless_sql_endpoint")
results["serverless"] = success
if error_msg:
error_messages["serverless"] = error_msg

# Check if any pools failed
if not all(results.values()):
failed_pools = [pool for pool, success in results.items() if not success]
error_details = "; ".join([f"{pool}: {error_messages.get(pool, 'Unknown error')}" for pool in failed_pools])
raise ConnectionError(f"Connection failed for SQL pools - {error_details}")
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ def get_sqlpool_reader(
endpoint_key: str = 'dedicated_sql_endpoint',
auth_type: str = 'sql_authentication',
) -> DatabaseManager:
"""
Create Synapse SQL pool reader.
"""
config = {
"endpoint_key": endpoint_key,
"driver": input_cred['driver'],
"server": input_cred[endpoint_key],
"database": db_name,
"sql_user": input_cred['sql_user'],
"sql_password": input_cred['sql_password'],
"user": input_cred['sql_user'],
"password": input_cred['sql_password'],
"port": input_cred.get('port', 1433),
"auth_type": auth_type,
}
# synapse and mssql use the same connector
source = "mssql"
# Use synapse connector which inherits from mssql
source = "synapse"

return DatabaseManager(source, config)
71 changes: 71 additions & 0 deletions tests/integration/connections/test_synapse_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, SynapseConnector
from databricks.labs.lakebridge.resources.assessments.synapse.common.connector import get_sqlpool_reader


@pytest.fixture()
def sandbox_synapse_config(sandbox_sqlserver_config) -> dict:
"""Convert SQL Server config to Synapse config format."""
# Transform MSSQL config to Synapse format
# In testing, we use SQL Server as a stand-in for Synapse since they use the same protocol
return {
"dedicated_sql_endpoint": sandbox_sqlserver_config["server"],
"sql_user": sandbox_sqlserver_config["user"],
"sql_password": sandbox_sqlserver_config["password"],
"driver": sandbox_sqlserver_config["driver"],
"database": sandbox_sqlserver_config["database"],
}


@pytest.fixture()
def sandbox_synapse(sandbox_synapse_config) -> DatabaseManager:
"""Create a DatabaseManager using SynapseConnector."""
return DatabaseManager("synapse", sandbox_synapse_config)


def test_synapse_connector_connection(sandbox_synapse):
"""Test that SynapseConnector can be instantiated."""
assert isinstance(sandbox_synapse.connector, SynapseConnector)


def test_synapse_connector_execute_query(sandbox_synapse):
"""Test executing a query through SynapseConnector."""
query = "SELECT 101 AS test_column"
result = sandbox_synapse.fetch(query).rows
assert result[0][0] == 101


def test_synapse_connection_check(sandbox_synapse):
"""Test connection check through SynapseConnector."""
assert sandbox_synapse.check_connection()


def test_get_sqlpool_reader_dedicated(sandbox_synapse_config):
"""Test get_sqlpool_reader with dedicated endpoint."""
db_name = sandbox_synapse_config["database"]

manager = get_sqlpool_reader(
sandbox_synapse_config,
db_name,
endpoint_key='dedicated_sql_endpoint',
auth_type='sql_authentication',
)

assert isinstance(manager, DatabaseManager)
assert isinstance(manager.connector, SynapseConnector)
assert manager.check_connection()


def test_get_sqlpool_reader_query(sandbox_synapse_config):
"""Test get_sqlpool_reader can execute queries."""
db_name = sandbox_synapse_config["database"]

manager = get_sqlpool_reader(
sandbox_synapse_config,
db_name,
endpoint_key='dedicated_sql_endpoint',
)

query = "SELECT 202 AS test_column"
result = manager.fetch(query).rows
assert result[0][0] == 202
Loading