Skip to content

Commit 55ab687

Browse files
authored
feat: git integration migration to new tables [CM-875] (#3735)
1 parent f71c658 commit 55ab687

File tree

2 files changed

+88
-95
lines changed

2 files changed

+88
-95
lines changed

services/apps/git_integration/src/crowdgit/database/crud.py

Lines changed: 86 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from datetime import datetime
2-
from typing import Any
32

43
from loguru import logger
54
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
@@ -17,27 +16,24 @@
1716
from .connection import get_db_connection
1817
from .registry import execute, executemany, fetchrow, fetchval, query
1918

20-
21-
async def insert_repository(url: str, priority: int = 0) -> str:
22-
"""Insert a new repository"""
23-
sql_query = """
24-
INSERT INTO git.repositories (url, priority, state)
25-
VALUES ($1, $2, 'pending')
26-
RETURNING id
27-
"""
28-
result = await fetchval(sql_query, (url, priority))
29-
return str(result)
30-
31-
32-
async def get_repository_by_url(url: str) -> dict[str, Any] | None:
33-
"""Get repository by URL"""
34-
sql_query = """
35-
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount"
36-
FROM git.repositories
37-
WHERE url = $1 AND "deletedAt" IS NULL
38-
"""
39-
result = await fetchrow(sql_query, (url,))
40-
return dict(result) if result else None
19+
# Common SELECT columns joining public.repositories + git.repositoryProcessing with aliases for backwards compatibility
20+
REPO_SELECT_COLUMNS = """
21+
r.id,
22+
r.url,
23+
r."segmentId",
24+
r."gitIntegrationId",
25+
r."forkedFrom",
26+
rp.state,
27+
rp.priority,
28+
rp."lockedAt",
29+
rp."lastProcessedAt",
30+
rp."lastProcessedCommit",
31+
rp.branch,
32+
rp."maintainerFile",
33+
rp."lastMaintainerRunAt",
34+
rp."stuckRequiresReOnboard",
35+
rp."reOnboardingCount"
36+
"""
4137

4238

4339
async def get_recently_processed_repository_by_url(url: str) -> Repository | None:
@@ -48,13 +44,14 @@ async def get_recently_processed_repository_by_url(url: str) -> Repository | Non
4844
and has a COMPLETED state.
4945
Used to check if a repository needs reprocessing based on the update interval.
5046
"""
51-
sql_query = """
52-
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom", "segmentId", "stuckRequiresReOnboard", "reOnboardingCount"
53-
FROM git.repositories
54-
WHERE url = $1
55-
AND "deletedAt" IS NULL
56-
AND "lastProcessedAt" > NOW() - INTERVAL '1 hour' * $2
57-
AND state = $3
47+
sql_query = f"""
48+
SELECT {REPO_SELECT_COLUMNS}
49+
FROM public.repositories r
50+
JOIN git."repositoryProcessing" rp ON rp."repositoryId" = r.id
51+
WHERE r.url = $1
52+
AND r."deletedAt" IS NULL
53+
AND rp."lastProcessedAt" > NOW() - INTERVAL '1 hour' * $2
54+
AND rp.state = $3
5855
"""
5956
result = await fetchrow(
6057
sql_query, (url, REPOSITORY_UPDATE_INTERVAL_HOURS, RepositoryState.COMPLETED)
@@ -63,32 +60,37 @@ async def get_recently_processed_repository_by_url(url: str) -> Repository | Non
6360

6461

6562
async def acquire_onboarding_repo() -> Repository | None:
66-
onboarding_repo_sql_query = """
63+
onboarding_repo_sql_query = f"""
6764
WITH current_onboarding_count AS (
68-
-- Count repositories currently being onboarded (processing + never processed before)
6965
SELECT COUNT(*) as count
70-
FROM git.repositories
71-
WHERE state = $1
72-
AND "lastProcessedCommit" IS NULL
73-
AND "deletedAt" IS NULL
74-
)
75-
UPDATE git.repositories
76-
SET "lockedAt" = NOW(),
77-
state = $1,
78-
"updatedAt" = NOW()
79-
WHERE id = (
66+
FROM git."repositoryProcessing" rp
67+
JOIN public.repositories r ON r.id = rp."repositoryId"
68+
WHERE rp.state = $1
69+
AND rp."lastProcessedCommit" IS NULL
70+
AND r."deletedAt" IS NULL
71+
),
72+
selected_repo AS (
8073
SELECT r.id
81-
FROM git.repositories r
74+
FROM public.repositories r
75+
JOIN git."repositoryProcessing" rp ON rp."repositoryId" = r.id
8276
CROSS JOIN current_onboarding_count c
83-
WHERE r.state = $2
84-
AND r."lockedAt" IS NULL
77+
WHERE rp.state = $2
78+
AND rp."lockedAt" IS NULL
8579
AND r."deletedAt" IS NULL
86-
AND c.count < $3 -- Only proceed if under the limit
87-
ORDER BY r.priority ASC, r."createdAt" ASC
80+
AND c.count < $3
81+
ORDER BY rp.priority ASC, rp."createdAt" ASC
8882
LIMIT 1
89-
FOR UPDATE SKIP LOCKED
83+
FOR UPDATE OF rp SKIP LOCKED
9084
)
91-
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount"
85+
UPDATE git."repositoryProcessing" rp
86+
SET "lockedAt" = NOW(),
87+
state = $1,
88+
"updatedAt" = NOW()
89+
FROM public.repositories r
90+
CROSS JOIN selected_repo
91+
WHERE rp."repositoryId" = r.id
92+
AND rp."repositoryId" = selected_repo.id
93+
RETURNING {REPO_SELECT_COLUMNS}
9294
"""
9395
return await acquire_repository(
9496
onboarding_repo_sql_query,
@@ -122,23 +124,28 @@ async def acquire_repository(query: str, params: tuple = None) -> Repository | N
122124

123125
async def acquire_recurrent_repo() -> Repository | None:
124126
"""Acquire a regular (non-onboarding) repository, that were not processed in the last x hours (REPOSITORY_UPDATE_INTERVAL_HOURS)"""
125-
recurrent_repo_sql_query = """
126-
UPDATE git.repositories
127+
recurrent_repo_sql_query = f"""
128+
WITH selected_repo AS (
129+
SELECT r.id
130+
FROM public.repositories r
131+
JOIN git."repositoryProcessing" rp ON rp."repositoryId" = r.id
132+
WHERE NOT (rp.state = ANY($2))
133+
AND rp."lockedAt" IS NULL
134+
AND r."deletedAt" IS NULL
135+
AND rp."lastProcessedAt" < NOW() - INTERVAL '1 hour' * $3
136+
ORDER BY rp.priority ASC, rp."lastProcessedAt" ASC
137+
LIMIT 1
138+
FOR UPDATE OF rp SKIP LOCKED
139+
)
140+
UPDATE git."repositoryProcessing" rp
127141
SET "lockedAt" = NOW(),
128142
state = $1,
129143
"updatedAt" = NOW()
130-
WHERE id = (
131-
SELECT id
132-
FROM git.repositories
133-
WHERE NOT (state = ANY($2))
134-
AND "lockedAt" IS NULL
135-
AND "deletedAt" IS NULL
136-
AND "lastProcessedAt" < NOW() - INTERVAL '1 hour' * $3
137-
ORDER BY priority ASC, "lastProcessedAt" ASC
138-
LIMIT 1
139-
FOR UPDATE SKIP LOCKED
140-
)
141-
RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch", "forkedFrom", "stuckRequiresReOnboard", "reOnboardingCount"
144+
FROM public.repositories r
145+
CROSS JOIN selected_repo
146+
WHERE rp."repositoryId" = r.id
147+
AND rp."repositoryId" = selected_repo.id
148+
RETURNING {REPO_SELECT_COLUMNS}
142149
"""
143150
states_to_exclude = (
144151
RepositoryState.PENDING,
@@ -196,10 +203,10 @@ async def release_repo(repo_id: str):
196203
Release repository lock (lockedAt) after processing
197204
"""
198205
sql_query = """
199-
UPDATE git.repositories
206+
UPDATE git."repositoryProcessing"
200207
SET "lockedAt" = NULL,
201208
"updatedAt" = NOW()
202-
WHERE id = $1
209+
WHERE "repositoryId" = $1
203210
"""
204211
result = await execute(sql_query, (repo_id,))
205212
return str(result)
@@ -210,34 +217,34 @@ async def update_last_processed_commit(repo_id: str, commit_hash: str, branch: s
210217
Update last processed commit and optionally the branch after processing
211218
"""
212219
sql_query = """
213-
UPDATE git.repositories
220+
UPDATE git."repositoryProcessing"
214221
SET "lastProcessedCommit" = $1,
215222
"branch" = $2,
216223
"updatedAt" = NOW()
217-
WHERE id = $3
224+
WHERE "repositoryId" = $3
218225
"""
219226
result = await execute(sql_query, (commit_hash, branch, repo_id))
220227
return str(result)
221228

222229

223230
async def increase_re_onboarding_count(repo_id: str):
224231
sql_query = """
225-
UPDATE git.repositories
232+
UPDATE git."repositoryProcessing"
226233
SET "reOnboardingCount" = "reOnboardingCount" + 1,
227234
"updatedAt" = NOW()
228-
WHERE id = $1
235+
WHERE "repositoryId" = $1
229236
"""
230237
return await execute(sql_query, (repo_id,))
231238

232239

233240
async def mark_repo_as_processed(repo_id: str, repo_state: RepositoryState):
234241
sql_query = """
235-
UPDATE git.repositories
242+
UPDATE git."repositoryProcessing"
236243
SET "state" = $2,
237244
"lastProcessedAt" = NOW(),
238245
"updatedAt" = NOW(),
239246
"priority" = $3
240-
WHERE id = $1
247+
WHERE "repositoryId" = $1
241248
"""
242249
result = await execute(sql_query, (repo_id, repo_state, RepositoryPriority.NORMAL))
243250
return str(result)
@@ -310,29 +317,19 @@ async def upsert_maintainer(
310317

311318

312319
async def update_maintainer_run(repo_id: str, maintainer_file: str):
313-
# TODO: deprecate githubRepos once all repos migrated to git.repositories
314-
# Update githubRepos table
315-
github_repos_sql_query = """
316-
UPDATE "githubRepos"
317-
SET "maintainerFile" = $1,
318-
"lastMaintainerRunAt" = NOW()
319-
WHERE id = $2
320-
"""
321-
await execute(
322-
github_repos_sql_query,
323-
(maintainer_file, repo_id),
324-
)
325-
326-
# Update git.repositories table
327-
git_repos_sql_query = """
328-
UPDATE git.repositories
320+
"""
321+
Update maintainer file info after processing.
322+
Updates git.repositoryProcessing table only.
323+
"""
324+
sql_query = """
325+
UPDATE git."repositoryProcessing"
329326
SET "maintainerFile" = $1,
330327
"lastMaintainerRunAt" = NOW(),
331328
"updatedAt" = NOW()
332-
WHERE id = $2
333-
"""
329+
WHERE "repositoryId" = $2
330+
"""
334331
await execute(
335-
git_repos_sql_query,
332+
sql_query,
336333
(maintainer_file, repo_id),
337334
)
338335

services/apps/git_integration/src/crowdgit/models/repository.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,9 @@ class Repository(BaseModel):
4646
description="Indicates if the stuck repository is resolved by a re-onboarding",
4747
)
4848
re_onboarding_count: int = Field(
49-
...,
49+
default=0,
5050
description="Tracks the number of times this repository has been re-onboarded. Used to identify unreachable commits via activity.attributes.cycle matching pattern onboarding-{reOnboardingCount}",
5151
)
52-
created_at: datetime = Field(..., description="Creation timestamp")
53-
updated_at: datetime = Field(..., description="Last update timestamp")
5452

5553
@classmethod
5654
def from_db(cls, db_data: dict[str, Any]) -> Repository:
@@ -65,13 +63,11 @@ def from_db(cls, db_data: dict[str, Any]) -> Repository:
6563

6664
# Map database field names to model field names
6765
field_mapping = {
68-
"createdAt": "created_at",
69-
"updatedAt": "updated_at",
7066
"lastProcessedAt": "last_processed_at",
7167
"lastProcessedCommit": "last_processed_commit",
7268
"lockedAt": "locked_at",
7369
"segmentId": "segment_id",
74-
"integrationId": "integration_id",
70+
"gitIntegrationId": "integration_id",
7571
"maintainerFile": "maintainer_file",
7672
"lastMaintainerRunAt": "last_maintainer_run_at",
7773
"forkedFrom": "forked_from",

0 commit comments

Comments
 (0)