Skip to content

Commit 0a1d6fb

Browse files
committed
temp
1 parent 3e29642 commit 0a1d6fb

File tree

7 files changed

+141
-32
lines changed

7 files changed

+141
-32
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,10 @@ public class Config extends ConfigBase {
357357
"Num of thread to handle agent task in agent task thread-pool"})
358358
public static int max_agent_task_threads_num = 4096;
359359

360+
@ConfField(mutable = false, masterOnly = true, description = {"EditLog任务线程池的线程数",
361+
"Num of thread to handle editlog task in editlog task thread-pool"})
362+
public static int editlog_task_threads_num = 8;
363+
360364
@ConfField(description = {"BDBJE 重加入集群时,最多回滚的事务数。如果回滚的事务数超过这个值,"
361365
+ "则 BDBJE 将无法重加入集群,需要手动清理 BDBJE 的数据。",
362366
"The max txn number which bdbje can rollback when trying to rejoin the group. "

fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,14 @@ public boolean isWriteLockHeldByCurrentThread() {
271271
return this.rwLock.writeLock().isHeldByCurrentThread();
272272
}
273273

274+
public <E extends Exception> void readLockOrException(E e) throws E {
275+
readLock();
276+
if (isDropped) {
277+
readUnlock();
278+
throw e;
279+
}
280+
}
281+
274282
public <E extends Exception> void writeLockOrException(E e) throws E {
275283
writeLock();
276284
if (isDropped) {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.clone;
19+
20+
import org.apache.doris.common.Config;
21+
import org.apache.doris.common.ThreadPoolManager;
22+
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
25+
26+
import java.util.concurrent.ExecutorService;
27+
28+
public class EditLogExecutor {
29+
private static final Logger LOG = LogManager.getLogger(EditLogExecutor.class);
30+
private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonFixedThreadPool(
31+
Config.editlog_task_threads_num, Config.editlog_task_threads_num * 200, "edit-log-pool", true);
32+
33+
public EditLogExecutor() {
34+
}
35+
36+
public static void submit(Runnable task, Runnable onFailure) {
37+
if (task == null) {
38+
return;
39+
}
40+
41+
Runnable wrappedTask = () -> {
42+
try {
43+
task.run();
44+
} catch (Exception e) {
45+
LOG.error("task execution error", e);
46+
onFailure.run();
47+
}
48+
};
49+
50+
try {
51+
EXECUTOR.submit(wrappedTask);
52+
} catch (Exception e) {
53+
LOG.error("submit error", e);
54+
onFailure.run();
55+
}
56+
}
57+
58+
}

fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public enum Status {
2424
SCHEDULE_FAILED, // failed to schedule the tablet, this should only happen in scheduling pending tablets.
2525
RUNNING_FAILED, // failed to running the clone task, this should only happen in handling running tablets.
2626
UNRECOVERABLE, // unable to go on, the tablet should be removed from tablet scheduler.
27+
SUBMITTED, // the redundant replica task has been submitted.
2728
FINISHED // schedule is done, remove the tablet from tablet scheduler with status FINISHED
2829
}
2930

fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,7 @@ private long getApproximateTimeoutMs() {
10851085
* 1. SCHEDULE_FAILED: will keep the tablet RUNNING.
10861086
* 2. UNRECOVERABLE: will remove the tablet from runningTablets.
10871087
*/
1088-
public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
1088+
public void finishCloneTask(TabletScheduler scheduler, CloneTask cloneTask, TFinishTaskRequest request)
10891089
throws SchedException {
10901090
Preconditions.checkState(state == State.RUNNING, state);
10911091
Preconditions.checkArgument(cloneTask.getTaskVersion() == CloneTask.VERSION_2);
@@ -1120,7 +1120,7 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
11201120
OlapTable olapTable = (OlapTable) db.getTableOrException(tblId,
11211121
s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
11221122
"tbl " + tabletId + " does not exist"));
1123-
olapTable.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
1123+
olapTable.readLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
11241124
+ olapTable.getName() + " does not exist"));
11251125
try {
11261126
Partition partition = olapTable.getPartition(partitionId);
@@ -1223,29 +1223,55 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
12231223
replica.getLastFailedVersion(),
12241224
replica.getLastSuccessVersion());
12251225

1226+
Runnable onFailure = () -> {
1227+
scheduler.getStat().counterTabletScheduledFailed.incrementAndGet();
1228+
scheduler.finalizeTabletCtx(this, TabletSchedCtx.State.CANCELLED, Status.SCHEDULE_FAILED,
1229+
"failed to submit edit log task");
1230+
};
1231+
12261232
if (replica.getState() == ReplicaState.CLONE) {
12271233
replica.setState(ReplicaState.NORMAL);
1228-
Env.getCurrentEnv().getEditLog().logAddReplica(info);
1234+
1235+
EditLogExecutor.submit(() -> {
1236+
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
1237+
scheduler.getStat().counterCloneTaskSucceeded.incrementAndGet();
1238+
scheduler.gatherStatistics(this);
1239+
scheduler.finalizeTabletCtx(this, TabletSchedCtx.State.FINISHED, Status.FINISHED,
1240+
"log delete replica finished");
1241+
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {},"
1242+
+ " is catchup {}",
1243+
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
1244+
}, onFailure);
12291245
} else {
12301246
// if in VERSION_INCOMPLETE, replica is not newly created, thus the state is not CLONE
12311247
// so we keep it state unchanged, and log update replica
1232-
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
1248+
EditLogExecutor.submit(() -> {
1249+
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
1250+
scheduler.getStat().counterCloneTaskSucceeded.incrementAndGet();
1251+
scheduler.gatherStatistics(this);
1252+
scheduler.finalizeTabletCtx(this, TabletSchedCtx.State.FINISHED, Status.FINISHED,
1253+
"log update replica finished");
1254+
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {},"
1255+
+ " is catchup {}",
1256+
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
1257+
}, onFailure);
12331258
}
12341259

1235-
state = State.FINISHED;
1236-
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {}, is catchup {}",
1237-
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
1260+
// TabletScheduler::finishCloneTask will handle this exception. If the task status is SUBMITTED,
1261+
// no need to run finalizeTabletCtx
1262+
throw new SchedException(Status.SUBMITTED, "edit log task is submitted");
12381263
} finally {
1239-
olapTable.writeUnlock();
1240-
}
1264+
olapTable.readUnlock();
12411265

1242-
if (request.isSetCopySize()) {
1243-
this.copySize = request.getCopySize();
1244-
}
1266+
if (request.isSetCopySize()) {
1267+
this.copySize = request.getCopySize();
1268+
}
12451269

1246-
if (request.isSetCopyTimeMs()) {
1247-
this.copyTimeMs = request.getCopyTimeMs();
1270+
if (request.isSetCopyTimeMs()) {
1271+
this.copyTimeMs = request.getCopyTimeMs();
1272+
}
12481273
}
1274+
12491275
}
12501276

12511277
public boolean isTimeout() {

fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -445,18 +445,19 @@ private void schedulePendingTablets() {
445445
stat.counterTabletScheduledFailed.incrementAndGet();
446446
addBackToPendingTablets(tabletCtx);
447447
}
448-
} else if (e.getStatus() == Status.FINISHED) {
449-
// schedule redundant tablet or scheduler disabled will throw this exception
450-
stat.counterTabletScheduledSucceeded.incrementAndGet();
451-
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getStatus(), e.getMessage());
448+
continue;
449+
} else if (e.getStatus() == Status.SUBMITTED) {
450+
// The SUBMITTED status will only occur when scheduling tablets that are of
451+
// the REDUNDANT or FORCE_REDUNDANT.
452+
continue;
452453
} else {
453454
Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus());
454455
// discard
455456
stat.counterTabletScheduledDiscard.incrementAndGet();
456457
tabletCtx.setSchedFailedCode(e.getSubCode());
457458
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
459+
continue;
458460
}
459-
continue;
460461
} catch (Exception e) {
461462
LOG.warn("got unexpected exception, discard this schedule. tablet: {}",
462463
tabletCtx.getTabletId(), e);
@@ -524,7 +525,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
524525
OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(),
525526
s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
526527
"tbl " + tabletCtx.getTblId() + " does not exist"));
527-
tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE,
528+
tbl.readLockOrException(new SchedException(Status.UNRECOVERABLE,
528529
"table " + tbl.getName() + " does not exist"));
529530
try {
530531
long tabletId = tabletCtx.getTabletId();
@@ -649,7 +650,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
649650

650651
handleTabletByTypeAndStatus(tabletHealth.status, tabletCtx, batchTask);
651652
} finally {
652-
tbl.writeUnlock();
653+
tbl.readUnlock();
653654
}
654655
}
655656

@@ -896,9 +897,9 @@ private void handleRedundantReplica(TabletSchedCtx tabletCtx, boolean force) thr
896897
|| deleteReplicaOnUrgentHighDisk(tabletCtx, force)
897898
|| deleteFromScaleInDropReplicas(tabletCtx, force)
898899
|| deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
899-
// if we delete at least one redundant replica, we still throw a SchedException with status FINISHED
900-
// to remove this tablet from the pendingTablets(consider it as finished)
901-
throw new SchedException(Status.FINISHED, "redundant replica is deleted");
900+
// if we will delete at least one redundant replica, an async task will be created to handle it.
901+
// we need to throw a SchedException with status SUBMITTED to indicate the task has been submitted.
902+
throw new SchedException(Status.SUBMITTED, "redundant replica task is submitted");
902903
}
903904
throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas");
904905
}
@@ -1284,10 +1285,18 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx,
12841285
tabletCtx.getTabletId(),
12851286
beId);
12861287

1287-
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
1288-
1289-
LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}",
1290-
tabletCtx.getTabletId(), beId, reason, force);
1288+
EditLogExecutor.submit(() -> {
1289+
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
1290+
LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}",
1291+
tabletCtx.getTabletId(), beId, reason, force);
1292+
stat.counterTabletScheduledSucceeded.incrementAndGet();
1293+
gatherStatistics(tabletCtx);
1294+
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "write edit log finished");
1295+
}, () -> {
1296+
stat.counterTabletScheduledFailed.incrementAndGet();
1297+
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.SCHEDULE_FAILED,
1298+
"failed to write edit log");
1299+
});
12911300
}
12921301

12931302
private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) {
@@ -1638,7 +1647,7 @@ private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
16381647
addTablet(tabletCtx, true /* force */);
16391648
}
16401649

1641-
private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
1650+
void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
16421651
if (state == TabletSchedCtx.State.CANCELLED || state == TabletSchedCtx.State.UNEXPECTED) {
16431652
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE
16441653
&& tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
@@ -1866,7 +1875,7 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
18661875

18671876
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
18681877
try {
1869-
tabletCtx.finishCloneTask(cloneTask, request);
1878+
tabletCtx.finishCloneTask(this, cloneTask, request);
18701879
} catch (SchedException e) {
18711880
tabletCtx.setErrMsg(e.getMessage());
18721881
if (e.getStatus() == Status.RUNNING_FAILED) {
@@ -1890,6 +1899,9 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
18901899
stat.counterTabletScheduledDiscard.incrementAndGet();
18911900
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
18921901
return true;
1902+
} else if (e.getStatus() == Status.SUBMITTED) {
1903+
// no finalizeTabletCtx
1904+
return true;
18931905
} else if (e.getStatus() == Status.FINISHED) {
18941906
// tablet is already healthy, just remove
18951907
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
@@ -1915,7 +1927,7 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
19151927
* It will be evaluated for future strategy.
19161928
* This should only be called when the tablet is down with state FINISHED.
19171929
*/
1918-
private void gatherStatistics(TabletSchedCtx tabletCtx) {
1930+
void gatherStatistics(TabletSchedCtx tabletCtx) {
19191931
if (tabletCtx.getCopySize() > 0 && tabletCtx.getCopyTimeMs() > 0) {
19201932
if (tabletCtx.getSrcBackendId() != -1 && tabletCtx.getSrcPathHash() != -1) {
19211933
PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getSrcBackendId());

fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public void testPartitionRebalancer() {
287287
TFinishTaskRequest fakeReq = new TFinishTaskRequest();
288288
fakeReq.task_status = new TStatus(TStatusCode.OK);
289289
fakeReq.finish_tablet_infos = Lists.newArrayList(new TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
290-
tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
290+
tabletSchedCtx.finishCloneTask(tabletScheduler, (CloneTask) task, fakeReq);
291291
} catch (SchedException e) {
292292
e.printStackTrace();
293293
}

0 commit comments

Comments
 (0)