Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions runtime/ops/filter/img_similar_images_cleaner/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ def determine_similar_images(self, file_features: List, p_hash: str, des_matrix:
similarity = round(result, 2)
if similarity >= self.similar_threshold:
logger.info(
"fileName: %s, method: ImgSimilarCleaner, dataset: %s. This picture is similar to %s, "
"and the similarity is %.4f. The picture is filtered.", file_name, self.task_uuid,
file_name_decoded, similarity)
f"fileName: {file_name}, method: ImgSimilarCleaner, dataset: {self.task_uuid}. "
f"This picture is similar to {file_name_decoded}, "
f"and the similarity is {similarity:.4f}. The picture is filtered."
)
return True
return False

Expand Down
26 changes: 15 additions & 11 deletions runtime/python-executor/datamate/operator_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import uvicorn
import yaml
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from jsonargparse import ArgumentParser
Expand All @@ -25,7 +26,20 @@
enqueue=True
)

app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
logger.info("Initializing background worker...")
start_auto_annotation_worker()
logger.info("Auto-annotation worker started successfully.")
except Exception as e:
logger.error("Failed to start auto-annotation worker: {}", e)

yield

logger.info("Shutting down background worker...")

app = FastAPI(lifespan=lifespan)


class APIException(Exception):
Expand All @@ -50,16 +64,6 @@ def to_dict(self) -> Dict[str, Any]:
return result


@app.on_event("startup")
async def startup_event():
"""FastAPI 启动时初始化后台自动标注 worker。"""

try:
start_auto_annotation_worker()
except Exception as e: # pragma: no cover - 防御性日志
logger.error("Failed to start auto-annotation worker: {}", e)


@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
return JSONResponse(
Expand Down
83 changes: 55 additions & 28 deletions runtime/python-executor/datamate/sql_manager/sql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,77 @@
import os
import time
from random import uniform
from threading import Lock

from loguru import logger
from sqlalchemy import create_engine, inspect
from sqlalchemy import create_engine
from sqlalchemy.engine import URL


class SQLManager:
_engine = None
_lock = Lock() # 确保多线程环境下只创建一个引擎

@staticmethod
def create_connect(max_retries=5, base_delay=1):
@classmethod
def _get_engine(cls):
"""
连接到 MySQL 数据库,使用 SQLAlchemy 和 PyMySQL。
:param max_retries: 最大重试次数
:param base_delay: 基础时延
:return: 返回 SQLAlchemy 连接对象
单例模式获取 Engine,确保全局只有一个连接池
"""
if cls._engine is not None:
return cls._engine

connection_url = URL.create(
drivername="postgresql+psycopg2", # 核心修改:使用 pg 驱动
username=os.getenv("PG_USER", "postgres"), # 建议修改环境变量名
password=os.getenv("PG_PASSWORD", "password"),
host=os.getenv("PG_HOST", "datamate-database"),
port=int(os.getenv("PG_PORT", 5432)), # 修改默认端口为 5432
database=os.getenv("PG_DATABASE", "datamate"),
)
with cls._lock:
if cls._engine is not None:
return cls._engine

attempt = 0
# 构建连接 URL
connection_url = URL.create(
drivername="postgresql+psycopg2",
username=os.getenv("PG_USER", "postgres"),
password=os.getenv("PG_PASSWORD", "password"),
host=os.getenv("PG_HOST", "datamate-database"),
port=int(os.getenv("PG_PORT", 5432)),
database=os.getenv("PG_DATABASE", "datamate"),
)

# 创建引擎 (只执行一次)
# 注意:AUTOCOMMIT 虽然方便,但建议根据业务场景谨慎使用。
# 如果需要事务控制(比如两张表必须同时更新成功),AUTOCOMMIT 会导致无法回滚。
cls._engine = create_engine(
connection_url,
pool_pre_ping=True,
isolation_level="AUTOCOMMIT",
pool_size=5, # 显式指定池大小
max_overflow=15, # 显式指定溢出
pool_timeout=30,
pool_recycle=1800 # 10分钟回收连接
)
logger.info("Database Engine initialized successfully.")
return cls._engine

@staticmethod
def create_connect(max_retries=5, base_delay=1):
"""
从现有的 Engine 连接池中获取连接,包含重试逻辑
"""
attempt = 0
while True:
try:
engine = create_engine(connection_url, pool_pre_ping=True, isolation_level="AUTOCOMMIT")
# 1. 获取全局引擎
engine = SQLManager._get_engine()
# 2. 从池中借出一个连接
return engine.connect()
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed with error: {str(e)}")
if attempt >= max_retries - 1:
raise
wait_time = min(30, base_delay * (2 ** attempt)) # 不超过30秒的最大延时
jitter = uniform(-wait_time / 4, wait_time / 4) # 增加随机抖动因子
time.sleep(wait_time + jitter)
attempt += 1
logger.error(f"Connection attempt {attempt} failed: {str(e)}")

if attempt >= max_retries:
logger.error("Max retries reached. Could not connect to database.")
raise

if __name__ == "__main__":
with SQLManager.create_connect() as connection:
inspector = inspect(connection)
print(inspector.get_table_names())

# 重试等待逻辑
wait_time = min(30, base_delay * (2 ** (attempt - 1)))
jitter = uniform(-wait_time / 4, wait_time / 4)
sleep_time = wait_time + jitter
logger.info(f"Retrying in {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
Loading