Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions alembic/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,16 @@ def %(name)s%(args)s:
return register

@classmethod
def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]:
def implementation_for(
cls, op_cls: Any, replace: bool = False
) -> Callable[[_C], _C]:
"""Register an implementation for a given :class:`.MigrateOperation`.

:param replace: optional flag that allows to replace an already
registered implementation. Default: False.

.. versionadded:: 1.17.2

This is part of the operation extensibility API.

.. seealso::
Expand All @@ -214,7 +221,7 @@ def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]:
"""

def decorate(fn: _C) -> _C:
cls._to_impl.dispatch_for(op_cls)(fn)
cls._to_impl.dispatch_for(op_cls, replace=replace)(fn)
return fn

return decorate
Expand Down
17 changes: 17 additions & 0 deletions alembic/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ def connection(self):

_connection_fixture_connection = None

@testing.fixture
def restore_operations(self):
"""Restore runners for modified operations"""

saved_impls = None
op_cls = None

def _save_attrs(_op_cls):
nonlocal saved_impls, op_cls
saved_impls = _op_cls._to_impl._registry.copy()
op_cls = _op_cls

yield _save_attrs

if op_cls is not None and saved_impls is not None:
op_cls._to_impl._registry = saved_impls

@config.fixture()
def metadata(self, request):
"""Provide bound MetaData for a single test, dropping afterwards."""
Expand Down
9 changes: 7 additions & 2 deletions alembic/util/langhelpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,18 @@ def __init__(self, uselist: bool = False) -> None:
self.uselist = uselist

def dispatch_for(
self, target: Any, qualifier: str = "default"
self, target: Any, qualifier: str = "default", replace: bool = False
) -> Callable[[_C], _C]:
def decorate(fn: _C) -> _C:
if self.uselist:
self._registry.setdefault((target, qualifier), []).append(fn)
else:
assert (target, qualifier) not in self._registry
if (target, qualifier) in self._registry and not replace:
raise ValueError(
"Can not set dispatch function for object "
f"{target!r}: key already exists. To replace "
"existing function, use replace=True."
)
self._registry[(target, qualifier)] = fn
return fn

Expand Down
84 changes: 84 additions & 0 deletions tests/test_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from alembic.operations import Operations
from alembic.operations import ops
from alembic.operations import schemaobj
from alembic.operations.toimpl import create_table as _create_table
from alembic.operations.toimpl import drop_table as _drop_table
from alembic.testing import assert_raises_message
from alembic.testing import combinations
from alembic.testing import config
Expand Down Expand Up @@ -1362,6 +1364,7 @@ def test_create_table_literal_binds(self):

class CustomOpTest(TestBase):
def test_custom_op(self):

@Operations.register_operation("create_sequence")
class CreateSequenceOp(MigrateOperation):
"""Create a SEQUENCE."""
Expand All @@ -1385,6 +1388,87 @@ def create_sequence(operations, operation):
op.create_sequence("foob")
context.assert_("CREATE SEQUENCE foob")

def test_replace_op(self, restore_operations):
restore_operations(Operations)
context = op_fixture()

log_table = Table(
"log_table",
MetaData(),
Column("action", String),
Column("table_name", String),
)

@Operations.implementation_for(ops.CreateTableOp, replace=True)
def create_table_proxy_log(operations, operation):
_create_table(operations, operation)
operations.execute(
log_table.insert().values(["create", operation.table_name])
)

op.create_table("some_table", Column("colname", Integer))

@Operations.implementation_for(ops.CreateTableOp, replace=True)
def create_table_proxy_invert(operations, operation):
_drop_table(operations, ops.DropTableOp(operation.table_name))
operations.execute(
log_table.insert().values(["delete", operation.table_name])
)

op.create_table("some_table")

context.assert_(
"CREATE TABLE some_table (colname INTEGER)",
"INSERT INTO log_table (action, table_name) "
"VALUES (:action, :table_name)",
"DROP TABLE some_table",
"INSERT INTO log_table (action, table_name) "
"VALUES (:action, :table_name)",
)

def test_replace_error(self):
with expect_raises_message(
ValueError,
"Can not set dispatch function for object "
"<class 'alembic.operations.ops.CreateTableOp'>: "
"key already exists. To replace existing function, use "
"replace=True.",
):

@Operations.implementation_for(ops.CreateTableOp)
def create_table(operations, operation):
pass

def test_replace_custom_op(self, restore_operations):
restore_operations(Operations)
context = op_fixture()

@Operations.register_operation("create_user")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this leaves around the "create_user" operation, which is not a big deal for now, unless we had a lot of other tests adding custom ops and we had a name collision. i guess we dont have "unregister_operation" right now

Copy link
Contributor Author

@justanothercatgirl justanothercatgirl Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to name operation _create_user, but langhelpers.py:117 does not allow that

    @classmethod
    def _add_proxied_attribute(cls,  methname, globals_, locals_, attr_names):
        if not methname.startswith("_"): # this line
            meth = getattr(cls, methname)
            if callable(meth):
                locals_[methname] = cls._create_method_proxy(
                    methname, globals_, locals_
                )
            else:
                attr_names.add(methname)

gonna fix the rest today, thanks for taking your time to point everything out :)

Copy link
Contributor Author

@justanothercatgirl justanothercatgirl Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make a fixture that completely restored Operations class and methods in op and it almost works really good (no formatting for now):

    @testing.fixture
    def restore_operations(self):
        saved_attrs = {}
        op_cls = None
        def _save_attrs(_op_cls):
            nonlocal op_cls
            # i know .copy() does not perform a deep copy, 
            # but it'll do for saving methods
            saved_attrs.update(_op_cls.__dict__.copy())
            op_cls = _op_cls

        yield _save_attrs
        for name, val in saved_attrs.items():
            if not name.startswith("__") and callable(val):
                setattr(op_cls, name, val)
        # this loop is pointless (explained below)
        for name, val in op_cls.__dict__.copy().items():
            if name not in saved_attrs:
                delattr(op_cls, name)

It does should restore changed implementations. It does not delete added operations, because _ModuleClsMeta lacks __delattr__ and ModuleClsProxy lacks _remove_proxied_attribute.

I don't think adding an ability to remove operations is needed (looks like "solving a non-existent problem" type of thing), but if you think it should be added, I will try. Other than that, I fixed everything you pointed out and will do a follow-up commit soon.

If you think this fixture is a bad idea whatsoever, I'll move restoring an operation before the assertion so that it definitely gets called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at register_operation it seems like you could call it over and over again for the same name anyway so yes I would not worry about it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would think "unregister_operation" would just be delattr(op_cls, name), but i havent worked w/ this code in many years

class CreateUserOp(MigrateOperation):
def __init__(self, user_name):
self.user_name = user_name

@classmethod
def create_user(cls, operations, user_name):
op = CreateUserOp(user_name)
return operations.invoke(op)

@Operations.implementation_for(CreateUserOp, replace=True)
def create_user(operations, operation):
operations.execute("CREATE USER %s" % operation.user_name)

op.create_user("bob")

@Operations.implementation_for(CreateUserOp, replace=True)
def create_user_alternative(operations, operation):
operations.execute(
"CREATE ROLE %s WITH LOGIN" % operation.user_name
)

op.create_user("bob")

context.assert_("CREATE USER bob", "CREATE ROLE bob WITH LOGIN")


class ObjectFromToTest(TestBase):
"""Test operation round trips for to_obj() / from_obj().
Expand Down