Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion synch/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ def insert_log(

def cluster_sql(cluster_name: str = None):
if cluster_name:
return f" on cluster {cluster_name} "
return f" on cluster {cluster_name}"
return ""
2 changes: 1 addition & 1 deletion synch/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def init_monitor_db(cluster_name: str = None):
sql_create_db = f"create database if not exists synch {cluster_sql(cluster_name)}"
writer.execute(sql_create_db)
if cluster_name:
engine = "ReplicatedMergeTree('/clickhouse/tables/{{shard}}/synch/log','{{replica}}')"
engine = f"ReplicatedMergeTree('/clickhouse/tables/{{shard}}/synch/log','{{replica}}')"
else:
engine = "ReplacingMergeTree"
sql_create_tb = f"""create table if not exists synch.log {cluster_sql(cluster_name)}
Expand Down
39 changes: 32 additions & 7 deletions synch/reader/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent

from synch.broker import Broker
from synch.common import cluster_sql
from synch.convert import SqlConvert
from synch.reader import Reader
from synch.redis_mixin import RedisLogPos
Expand All @@ -19,7 +20,6 @@
class Mysql(Reader):
only_events = (DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, QueryEvent)
fix_column_type = True

def __init__(self, alias):
super().__init__(alias)
source_db = self.source_db
Expand Down Expand Up @@ -136,6 +136,7 @@ def _binlog_reading(
skip_delete_tables,
skip_update_tables,
) -> Generator:
sequence = 0
stream = BinLogStreamReader(
connection_settings=dict(
host=self.host, port=self.port, user=self.user, passwd=self.password,
Expand All @@ -152,13 +153,16 @@ def _binlog_reading(
slave_heartbeat=10,
)
for binlog_event in stream:
sequence += 1
if isinstance(binlog_event, QueryEvent):
schema = binlog_event.schema.decode()
query = binlog_event.query.lower()
cluster_name = Settings.cluster_name()
suffix = Settings.get("clickhouse").get("distributed_suffix")
if "alter" not in query:
continue
table, convent_sql = SqlConvert.to_clickhouse(
schema, query, Settings.cluster_name()
schema, query, cluster_name
)
if not convent_sql:
continue
Expand All @@ -168,9 +172,30 @@ def _binlog_reading(
"action": "query",
"values": {"query": convent_sql},
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": 0,
"action_seq": sequence,
}
yield schema, None, event, stream.log_file, stream.log_pos
if Settings.is_cluster():
drop_sql = f"drop table {schema}.{table}{suffix}{cluster_sql(cluster_name)}"
drop_event = {
"table": table,
"schema": schema,
"action": "query",
"values": {"query": drop_sql},
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": sequence,
}
yield schema, None, drop_event, stream.log_file, stream.log_pos
create_sql = f"create table if not exists {schema}.{table}{suffix}{cluster_sql(cluster_name)} AS {schema}.{table} ENGINE = Distributed({cluster_name},{schema},{table},rand())"
create_event = {
"table": table,
"schema": schema,
"action": "query",
"values": {"query": create_sql},
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": sequence,
}
yield schema, None, create_event, stream.log_file, stream.log_pos
else:
schema = binlog_event.schema
table = binlog_event.table
Expand All @@ -183,7 +208,7 @@ def _binlog_reading(
"action": "insert",
"values": row["values"],
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": 2,
"action_seq": sequence,
}

elif isinstance(binlog_event, UpdateRowsEvent):
Expand All @@ -195,7 +220,7 @@ def _binlog_reading(
"action": "delete",
"values": row["before_values"],
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": 1,
"action_seq": sequence,
}
yield binlog_event.schema, binlog_event.table, delete_event, stream.log_file, stream.log_pos
event = {
Expand All @@ -204,7 +229,7 @@ def _binlog_reading(
"action": "insert",
"values": row["after_values"],
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": 2,
"action_seq": sequence,
}

elif isinstance(binlog_event, DeleteRowsEvent):
Expand All @@ -216,7 +241,7 @@ def _binlog_reading(
"action": "delete",
"values": row["values"],
"event_unixtime": int(time.time() * 10 ** 6),
"action_seq": 1,
"action_seq": sequence,
}
else:
return
Expand Down
4 changes: 4 additions & 0 deletions synch/replication/continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ def continuous_etl(
else:
delete_pks = []
if insert:
#删除被delete掉的insert
for pk_v, value in delete.items():
if pk_v in insert and insert[pk_v]['action_seq']<value['action_seq']:
insert.pop(pk_v)
insert_events = list(
sorted(insert.values(), key=lambda x: x.get("event_unixtime"))
)
Expand Down
5 changes: 3 additions & 2 deletions synch/replication/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from synch.factory import get_reader, get_writer
from synch.settings import Settings
from synch.common import cluster_sql


def etl_full(
Expand Down Expand Up @@ -36,7 +37,7 @@ def etl_full(
elif isinstance(pk, tuple):
pk = f"({','.join(pk)})"
if renew:
drop_sql = f"drop table if exists {schema}.{table_name}"
drop_sql = f"drop table if exists {schema}.{table_name} {cluster_sql(Settings.cluster_name())}"
writer.execute(drop_sql)
logger.info(f"drop table success:{schema}.{table_name}")
if not writer.check_table_exists(schema, table_name):
Expand All @@ -58,7 +59,7 @@ def etl_full(
for w in get_writer(choice=False):
w.execute(
w.get_distributed_table_create_sql(
schema, table_name, Settings.get("clickhouse.distributed_suffix")
schema, table_name, Settings.get("clickhouse").get("distributed_suffix")
)
)
if reader.fix_column_type and not table.get("skip_decimal"):
Expand Down
2 changes: 1 addition & 1 deletion synch/writer/collapsing_merge_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_table_create_sql(
partition_by_str = f" PARTITION BY {partition_by} "
if engine_settings:
engine_settings_str = f" SETTINGS {engine_settings} "
return f"CREATE TABLE {schema}.{table}{cluster_sql(self.cluster_name)}ENGINE = {self.engine}({sign_column}) {partition_by_str} ORDER BY {pk} {engine_settings_str} AS {select_sql} limit 0"
return f"CREATE TABLE {schema}.{table}{cluster_sql(self.cluster_name)} ENGINE = {self.engine}({sign_column}) {partition_by_str} ORDER BY {pk} {engine_settings_str} AS {select_sql} limit 0"

def get_full_insert_sql(self, reader: Reader, schema: str, table: str, sign_column: str = None):
return f"insert into {schema}.{table} {reader.get_source_select_sql(schema, table, sign_column)}"
Expand Down
5 changes: 3 additions & 2 deletions synch/writer/merge_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from loguru import logger

from synch.settings import Settings
from synch.common import cluster_sql
from synch.enums import ClickHouseEngine
from synch.reader import Reader
Expand All @@ -17,7 +18,7 @@ def delete_events(self, schema: str, table: str, pk: Union[tuple, str], pk_list:
"""
params = None
if isinstance(pk, tuple):
sql = f"alter table {schema}.{table} delete where "
sql = f"alter table {schema}.{table}{cluster_sql(Settings.cluster_name())} delete where "
pks_list = []
for pk_value in pk_list:
item = []
Expand All @@ -31,7 +32,7 @@ def delete_events(self, schema: str, table: str, pk: Union[tuple, str], pk_list:
sql += " or ".join(pks_list)
else:
params = {"pks": tuple(pk_list)}
sql = f"alter table {schema}.{table} delete where {pk} in %(pks)s"
sql = f"alter table {schema}.{table}{cluster_sql(Settings.cluster_name())} delete where {pk} in %(pks)s"
self.execute(sql, params)
return sql, params

Expand Down