Skip to content

Commit 2f48a99

Browse files
author
Ilyas Gasanov
committed
[DOP-31720] Add SQL transformation to API
1 parent fe8ec6e commit 2f48a99

File tree

5 files changed

+200
-3
lines changed

5 files changed

+200
-3
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add SQL transformation to API

syncmaster/schemas/v1/transfers/transformations/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
from syncmaster.schemas.v1.transfers.transformations.file_metadata_filter import (
1515
FileMetadataFilter,
1616
)
17+
from syncmaster.schemas.v1.transfers.transformations.sql import (
18+
Sql,
19+
)
1720

1821
TransformationSchema = Annotated[
19-
DataframeRowsFilter | DataframeColumnsFilter | FileMetadataFilter,
22+
DataframeRowsFilter | DataframeColumnsFilter | FileMetadataFilter | Sql,
2023
Field(discriminator="type"),
2124
]
2225

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# SPDX-FileCopyrightText: 2023-present MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
import re
4+
from typing import Literal
5+
6+
from pydantic import BaseModel, field_validator
7+
8+
ALLOWED_SELECT_QUERY = re.compile(r"^\s*(SELECT|WITH)\b", re.IGNORECASE)
9+
10+
11+
class Sql(BaseModel):
12+
type: Literal["sql"]
13+
query: str
14+
dialect: Literal["spark"] = "spark"
15+
16+
@field_validator("query", mode="after")
17+
@classmethod
18+
def _validate_query(cls, value: str) -> str:
19+
if not ALLOWED_SELECT_QUERY.match(value):
20+
msg = f"Query must be a SELECT statement, got '{value}'"
21+
raise ValueError(msg)
22+
23+
return value

syncmaster/schemas/v1/transformation_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
66
DATAFRAME_COLUMNS_FILTER = Literal["dataframe_columns_filter"]
77
FILE_METADATA_FILTER = Literal["file_metadata_filter"]
8+
SQL = Literal["sql"]

tests/test_unit/test_transfers/test_create_transfer.py

Lines changed: 171 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,13 +550,13 @@ async def test_superuser_can_create_transfer(
550550
"message": (
551551
"Input tag 'some unknown transformation type' found using 'type' "
552552
"does not match any of the expected tags: 'dataframe_rows_filter', "
553-
"'dataframe_columns_filter', 'file_metadata_filter'"
553+
"'dataframe_columns_filter', 'file_metadata_filter', 'sql'"
554554
),
555555
"code": "union_tag_invalid",
556556
"context": {
557557
"discriminator": "'type'",
558558
"expected_tags": (
559-
"'dataframe_rows_filter', 'dataframe_columns_filter', 'file_metadata_filter'"
559+
"'dataframe_rows_filter', 'dataframe_columns_filter', 'file_metadata_filter', 'sql'"
560560
),
561561
"tag": "some unknown transformation type",
562562
},
@@ -809,6 +809,72 @@ async def test_superuser_can_create_transfer(
809809
},
810810
id="invalid_name_regexp",
811811
),
812+
pytest.param(
813+
{
814+
"transformations": [
815+
{
816+
"type": "sql",
817+
"query": "INSERT INTO table1 VALUES (1, 2)",
818+
"dialect": "spark",
819+
},
820+
],
821+
},
822+
{
823+
"error": {
824+
"code": "invalid_request",
825+
"message": "Invalid request",
826+
"details": [
827+
{
828+
"location": [
829+
"body",
830+
"transformations",
831+
0,
832+
"sql",
833+
"query",
834+
],
835+
"message": "Value error, Query must be a SELECT statement, got 'INSERT INTO table1 VALUES (1, 2)'",
836+
"code": "value_error",
837+
"context": {},
838+
"input": "INSERT INTO table1 VALUES (1, 2)",
839+
},
840+
],
841+
},
842+
},
843+
id="sql_non_select_query",
844+
),
845+
pytest.param(
846+
{
847+
"transformations": [
848+
{
849+
"type": "sql",
850+
"query": None,
851+
"dialect": "spark",
852+
},
853+
],
854+
},
855+
{
856+
"error": {
857+
"code": "invalid_request",
858+
"message": "Invalid request",
859+
"details": [
860+
{
861+
"location": [
862+
"body",
863+
"transformations",
864+
0,
865+
"sql",
866+
"query",
867+
],
868+
"message": "Input should be a valid string",
869+
"code": "string_type",
870+
"context": {},
871+
"input": None,
872+
},
873+
],
874+
},
875+
},
876+
id="sql_non_string_query",
877+
),
812878
pytest.param(
813879
{
814880
"resources": {
@@ -1290,3 +1356,106 @@ async def test_superuser_cannot_create_transfer_with_unknown_queue_error(
12901356
"details": None,
12911357
},
12921358
}
1359+
1360+
1361+
async def test_developer_plus_can_create_transfer_with_sql_transformation(
1362+
client: AsyncClient,
1363+
two_group_connections: tuple[MockConnection, MockConnection],
1364+
session: AsyncSession,
1365+
role_developer_plus: UserTestRoles,
1366+
group_queue: Queue,
1367+
mock_group: MockGroup,
1368+
):
1369+
first_connection, second_connection = two_group_connections
1370+
user = mock_group.get_member_of_role(role_developer_plus)
1371+
1372+
response = await client.post(
1373+
"v1/transfers",
1374+
headers={"Authorization": f"Bearer {user.token}"},
1375+
json={
1376+
"group_id": mock_group.group.id,
1377+
"name": "new test transfer with sql",
1378+
"source_connection_id": first_connection.id,
1379+
"target_connection_id": second_connection.id,
1380+
"source_params": {"type": "postgres", "table_name": "schema.source_table"},
1381+
"target_params": {"type": "postgres", "table_name": "schema.target_table"},
1382+
"transformations": [
1383+
{
1384+
"type": "sql",
1385+
"query": "SELECT * FROM table",
1386+
"dialect": "spark",
1387+
},
1388+
],
1389+
"queue_id": group_queue.id,
1390+
},
1391+
)
1392+
assert response.status_code == 200, response.text
1393+
1394+
transfer = (
1395+
await session.scalars(
1396+
select(Transfer).filter_by(
1397+
name="new test transfer with sql",
1398+
group_id=mock_group.group.id,
1399+
),
1400+
)
1401+
).one()
1402+
1403+
assert response.json() == {
1404+
"id": transfer.id,
1405+
"group_id": transfer.group_id,
1406+
"name": transfer.name,
1407+
"description": transfer.description,
1408+
"schedule": transfer.schedule,
1409+
"is_scheduled": transfer.is_scheduled,
1410+
"source_connection_id": transfer.source_connection_id,
1411+
"target_connection_id": transfer.target_connection_id,
1412+
"source_params": transfer.source_params,
1413+
"target_params": transfer.target_params,
1414+
"strategy_params": transfer.strategy_params,
1415+
"transformations": transfer.transformations,
1416+
"resources": transfer.resources,
1417+
"queue_id": transfer.queue_id,
1418+
}
1419+
1420+
1421+
@pytest.mark.parametrize(
1422+
"query",
1423+
[
1424+
"SELECT col1, col2 FROM table1 WHERE col1 > 100;",
1425+
"select col1 from table1",
1426+
" SELECT col1 FROM table1 ",
1427+
" WITH some AS (SELECT col1 FROM table1) SELECT * FROM some;",
1428+
],
1429+
)
1430+
async def test_sql_transformation_with_valid_select_queries(
1431+
client: AsyncClient,
1432+
two_group_connections: tuple[MockConnection, MockConnection],
1433+
session: AsyncSession,
1434+
superuser: MockUser,
1435+
group_queue: Queue,
1436+
mock_group: MockGroup,
1437+
query: str,
1438+
):
1439+
first_conn, second_conn = two_group_connections
1440+
1441+
response = await client.post(
1442+
"v1/transfers",
1443+
headers={"Authorization": f"Bearer {superuser.token}"},
1444+
json={
1445+
"group_id": mock_group.id,
1446+
"name": f"test transfer sql {query}",
1447+
"source_connection_id": first_conn.id,
1448+
"target_connection_id": second_conn.id,
1449+
"source_params": {"type": "postgres", "table_name": "schema.source_table"},
1450+
"target_params": {"type": "postgres", "table_name": "schema.target_table"},
1451+
"transformations": [
1452+
{
1453+
"type": "sql",
1454+
"query": query,
1455+
"dialect": "spark",
1456+
},
1457+
],
1458+
"queue_id": group_queue.id,
1459+
},
1460+
)
1461+
assert response.status_code == 200, response.text

0 commit comments

Comments
 (0)