Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ An open-source, extensible test suite that validates Posit Team deployments are
installed correctly and functioning properly.

VIP uses **BDD-style tests** (pytest-bdd + Playwright) to verify Connect,
Workbench, and Package Manager. Results are compiled into an **HTML report**
that can be published to a Connect server.
Workbench, and Package Manager across standalone, Kubernetes, and Snowflake
Native App deployments. Results are compiled into an **HTML report** that can
be published to a Connect server.

**Documentation:** https://posit-dev.github.io/vip/

Expand Down
99 changes: 99 additions & 0 deletions selftests/test_client_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Tests for vip.client_auth — pluggable HTTP-client auth registry."""

from __future__ import annotations

import httpx
import pytest

from vip import client_auth
from vip.client_auth import (
build_client_auth,
get_client_auth_factory,
register_client_auth,
)
from vip.clients.base import BaseClient
from vip.config import VIPConfig


@pytest.fixture(autouse=True)
def _restore_registry():
"""Snapshot and restore the module-level registry around each test."""
saved = dict(client_auth._CLIENT_AUTH_FACTORIES)
try:
yield
finally:
client_auth._CLIENT_AUTH_FACTORIES.clear()
client_auth._CLIENT_AUTH_FACTORIES.update(saved)


class _DummyAuth(httpx.Auth):
def auth_flow(self, request):
yield request


def _config(idp: str = "") -> VIPConfig:
cfg = VIPConfig()
cfg.auth.idp = idp
return cfg


class TestRegistry:
def test_register_and_get_round_trip(self):
def factory(config, product, base_url):
return None

register_client_auth("snowflake", factory)
assert get_client_auth_factory("snowflake") is factory

def test_lookup_is_case_insensitive(self):
def factory(config, product, base_url):
return None

register_client_auth("Snowflake", factory)
assert get_client_auth_factory(" SNOWFLAKE ") is factory

def test_unregistered_returns_none(self):
assert get_client_auth_factory("nope") is None


class TestBuildClientAuth:
def test_no_idp_returns_none(self):
assert build_client_auth(_config(""), "connect", "https://x") is None

def test_unregistered_idp_returns_none(self):
assert build_client_auth(_config("snowflake"), "connect", "https://x") is None

def test_invokes_factory_with_product_and_url(self):
calls = []
auth = _DummyAuth()

def factory(config, product, base_url):
calls.append((product, base_url))
return auth

register_client_auth("snowflake", factory)
result = build_client_auth(_config("snowflake"), "workbench", "https://wb")
assert result is auth
assert calls == [("workbench", "https://wb")]

def test_factory_may_return_none(self):
register_client_auth("snowflake", lambda c, p, u: None)
assert build_client_auth(_config("snowflake"), "connect", "https://x") is None


class TestBaseClientAuthInjection:
def test_auth_is_forwarded_to_httpx_client(self):
auth = _DummyAuth()
client = BaseClient("https://example.com", auth=auth)
try:
assert client._client.auth is auth
finally:
client.close()

def test_default_has_no_auth(self):
client = BaseClient("https://example.com", auth_header_value="Key abc")
try:
# httpx represents "no auth" as its internal sentinel, not our auth.
assert not isinstance(client._client.auth, _DummyAuth)
finally:
client.close()
74 changes: 73 additions & 1 deletion selftests/test_idp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest

from vip.auth import AuthConfigError
from vip.idp import SUPPORTED_IDPS, get_idp_strategy
from vip.idp import SUPPORTED_IDPS, _fill_snowflake_login, get_idp_strategy


class TestGetIdpStrategy:
Expand All @@ -19,6 +19,10 @@ def test_okta_returns_callable(self):
strategy = get_idp_strategy("okta")
assert callable(strategy)

def test_snowflake_returns_callable(self):
strategy = get_idp_strategy("snowflake")
assert callable(strategy)

def test_unknown_idp_raises(self):
with pytest.raises(AuthConfigError, match="Unsupported IdP.*unknown.*keycloak.*okta"):
get_idp_strategy("unknown")
Expand All @@ -27,10 +31,78 @@ def test_case_insensitive_lookup(self):
assert get_idp_strategy("Keycloak") is get_idp_strategy("keycloak")
assert get_idp_strategy("OKTA") is get_idp_strategy("okta")
assert get_idp_strategy(" Okta ") is get_idp_strategy("okta")
assert get_idp_strategy("Snowflake") is get_idp_strategy("snowflake")

def test_supported_idps_contains_expected(self):
assert "keycloak" in SUPPORTED_IDPS
assert "okta" in SUPPORTED_IDPS
assert "snowflake" in SUPPORTED_IDPS


class TestSnowflakeLogin:
"""Behavioural tests for the Snowflake OAuth form-fill strategy.

The strategy loops, filling one sign-in form per Snowflake OAuth hop
until no form appears. The page mock controls how many hops present a
form via ``username_loc.wait_for`` side effects.
"""

def _make_page(self, *, num_forms: int, consent_visible: bool):
from playwright.sync_api import TimeoutError as PlaywrightTimeout

from vip.idp import _SF_PASSWORD, _SF_SUBMIT, _SF_USERNAME

username_loc = MagicMock(name="username_loc")
# wait_for succeeds for `num_forms` hops, then times out to end the loop.
username_loc.wait_for.side_effect = [None] * num_forms + [PlaywrightTimeout("no form")]
password_loc = MagicMock(name="password_loc")
submit_loc = MagicMock(name="submit_loc")
allow_button = MagicMock(name="allow_button")
if not consent_visible:
allow_button.wait_for.side_effect = PlaywrightTimeout("no consent screen")

locators = {_SF_USERNAME: username_loc, _SF_PASSWORD: password_loc, _SF_SUBMIT: submit_loc}
page = MagicMock(name="page")
page.locator.side_effect = lambda sel: locators[sel]
page.get_by_role.return_value = allow_button
page.url = "https://acct.snowflakecomputing.com/oauth/authorize"
return page, username_loc, password_loc, submit_loc, allow_button

def test_fills_credentials_and_submits_second_signin(self):
page, username_loc, password_loc, submit_loc, _ = self._make_page(
num_forms=1, consent_visible=True
)
_fill_snowflake_login(page, "user@example.com", "s3cret")

username_loc.fill.assert_called_once_with("user@example.com")
password_loc.fill.assert_called_once_with("s3cret")
# The username/password "Sign in" is the *second* button.
submit_loc.nth.assert_any_call(1)
submit_loc.nth(1).click.assert_called()

def test_clicks_allow_when_consent_shown(self):
page, _, _, _, allow_button = self._make_page(num_forms=1, consent_visible=True)
_fill_snowflake_login(page, "user", "pass")
allow_button.click.assert_called_once()

def test_consent_screen_is_optional(self):
page, _, _, _, allow_button = self._make_page(num_forms=1, consent_visible=False)
# Must not raise when the consent screen never appears.
_fill_snowflake_login(page, "user", "pass")
allow_button.click.assert_not_called()

def test_fills_every_form_in_the_multi_hop_chain(self):
# Two Snowflake hops (product-host ingress, then controller-host)
# must each get the credentials filled — the "double auth".
page, username_loc, _, _, _ = self._make_page(num_forms=2, consent_visible=False)
_fill_snowflake_login(page, "user", "pass")
assert username_loc.fill.call_count == 2

def test_stops_when_no_form_appears(self):
# No sign-in form at all (e.g. an already-active session): no fill.
page, username_loc, _, _, _ = self._make_page(num_forms=0, consent_visible=False)
_fill_snowflake_login(page, "user", "pass")
username_loc.fill.assert_not_called()


class TestKeycloakUsesTotpGetCode:
Expand Down
8 changes: 5 additions & 3 deletions src/vip/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def start_headless_auth(

At least one of *connect_url* or *workbench_url* must be provided.
The *idp* parameter selects which form automation strategy to use
(e.g. ``"keycloak"``, ``"okta"``).
(e.g. ``"keycloak"``, ``"okta"``, ``"snowflake"``).

When *insecure* is ``True``, Playwright ignores TLS certificate errors.
When *ca_bundle* is set, the path is exported as ``NODE_EXTRA_CA_CERTS``
Expand Down Expand Up @@ -556,12 +556,14 @@ def start_headless_auth(
uses_idp = provider.strip().lower() in _IDP_PROVIDERS
fill_login = None
if uses_idp:
from vip.idp import SUPPORTED_IDPS, get_idp_strategy

if not idp:
supported = ", ".join(f'"{name}"' for name in sorted(SUPPORTED_IDPS))
raise AuthConfigError(
f"--headless-auth with provider={provider!r} requires"
' [auth] idp in vip.toml (supported: "keycloak", "okta")'
f" [auth] idp in vip.toml (supported: {supported})"
)
from vip.idp import get_idp_strategy

fill_login = get_idp_strategy(idp)

Expand Down
2 changes: 1 addition & 1 deletion src/vip/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ def main() -> None:
auth_group.add_argument(
"--idp",
default=None,
help='Identity provider for --headless-auth: "keycloak", "okta". '
help='Identity provider for --headless-auth: "keycloak", "okta", "snowflake". '
'Presence implies provider = "oidc" unless overridden in vip.toml.',
)
verify_parser.add_argument(
Expand Down
79 changes: 79 additions & 0 deletions src/vip/client_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Pluggable HTTP-client authentication registry.

VIP's product API clients (``ConnectClient``, ``WorkbenchClient``,
``PackageManagerClient``) normally authenticate with a static product
credential — a Connect API key, a Package Manager token, etc. Some
deployments front the products with an external authenticator that needs a
different, possibly per-request or per-host, scheme that a single static
``Authorization`` header cannot express.

The motivating case is a Snowflake Native App, where each product sits behind
a Snowpark Container Services ingress that expects a short-lived
``Authorization: Snowflake Token="..."`` header derived per host via a JWT
token exchange. Rather than teach VIP about Snowflake, this registry lets a
downstream extension register an :class:`httpx.Auth` factory keyed by IdP
name. The product-client fixtures consult the registry and inject the
returned auth into the client.

Downstream usage (e.g. from an extension ``conftest.py``):

from vip.client_auth import register_client_auth

def _snowflake_auth(config, product, base_url):
return MySnowflakeHttpxAuth(...)

register_client_auth("snowflake", _snowflake_auth)

The registry is keyed by the configured ``[auth] idp`` value, so the same IdP
name selects both the browser login strategy (:mod:`vip.idp`) and the HTTP
client auth.
"""

from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING

import httpx

if TYPE_CHECKING:
from vip.config import VIPConfig

# A factory receives the loaded config, the product name ("connect",
# "workbench", "package_manager") and that product's base URL, and returns an
# ``httpx.Auth`` to use for the client — or ``None`` to fall back to the
# default static-credential behaviour.
ClientAuthFactory = Callable[["VIPConfig", str, str], "httpx.Auth | None"]

_CLIENT_AUTH_FACTORIES: dict[str, ClientAuthFactory] = {}


def register_client_auth(idp: str, factory: ClientAuthFactory) -> None:
"""Register an :class:`httpx.Auth` *factory* for the given *idp* name.

The *idp* value is normalized (stripped, lowercased). Re-registering an
IdP replaces the previous factory.
"""
_CLIENT_AUTH_FACTORIES[idp.strip().lower()] = factory


def get_client_auth_factory(idp: str) -> ClientAuthFactory | None:
"""Return the registered factory for *idp*, or ``None`` if unregistered."""
return _CLIENT_AUTH_FACTORIES.get(idp.strip().lower())


def build_client_auth(config: VIPConfig, product: str, base_url: str) -> httpx.Auth | None:
"""Build an :class:`httpx.Auth` for *product* from the registered factory.

Looks up the factory for the configured ``[auth] idp`` and invokes it.
Returns ``None`` when no IdP is configured, no factory is registered for
it, or the factory itself returns ``None`` — in which case clients use
their default static-credential auth.
"""
idp = (config.auth.idp or "").strip().lower()
if not idp:
return None
factory = _CLIENT_AUTH_FACTORIES.get(idp)
if factory is None:
return None
return factory(config, product, base_url)
Loading
Loading