Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ public class Config extends ConfigBase {
"Num of thread to handle agent task in agent task thread-pool"})
public static int max_agent_task_threads_num = 4096;

@ConfField(mutable = false, masterOnly = true, description = {"EditLog任务线程池的线程数",
"Num of thread to handle editlog task in editlog task thread-pool"})
public static int editlog_task_threads_num = 8;

@ConfField(description = {"BDBJE 重加入集群时,最多回滚的事务数。如果回滚的事务数超过这个值,"
+ "则 BDBJE 将无法重加入集群,需要手动清理 BDBJE 的数据。",
"The max txn number which bdbje can rollback when trying to rejoin the group. "
Expand Down
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ public boolean isWriteLockHeldByCurrentThread() {
return this.rwLock.writeLock().isHeldByCurrentThread();
}

public <E extends Exception> void readLockOrException(E e) throws E {
readLock();
if (isDropped) {
readUnlock();
throw e;
}
}

public <E extends Exception> void writeLockOrException(E e) throws E {
writeLock();
if (isDropped) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public BalanceStatus isFit(long tabletSize, TStorageMedium medium,
// if this is a supplement task, ignore the storage medium
if (!isSupplement && medium != null && pathStatistic.getStorageMedium() != medium) {
if (LOG.isDebugEnabled()) {
LOG.debug("backend {} path {}'s storage medium {} is not {} storage medium, actual: {}",
LOG.debug("backend {} path {}'s storage medium {} is not {} storage medium",
beId, pathStatistic.getPath(), pathStatistic.getStorageMedium(), medium);
}
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ExecutorService;

public class EditLogExecutor {
private static final Logger LOG = LogManager.getLogger(EditLogExecutor.class);
private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonFixedThreadPool(
Config.editlog_task_threads_num, Config.editlog_task_threads_num * 200, "edit-log-pool", true);

public EditLogExecutor() {
}

public static void submit(Runnable task, Runnable onFailure) {
if (task == null) {
return;
}

Runnable wrappedTask = () -> {
try {
task.run();
} catch (Exception e) {
LOG.error("task execution error", e);
onFailure.run();
}
};

try {
EXECUTOR.submit(wrappedTask);
} catch (Exception e) {
LOG.error("submit error", e);
onFailure.run();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum Status {
SCHEDULE_FAILED, // failed to schedule the tablet, this should only happen in scheduling pending tablets.
RUNNING_FAILED, // failed to running the clone task, this should only happen in handling running tablets.
UNRECOVERABLE, // unable to go on, the tablet should be removed from tablet scheduler.
SUBMITTED, // the redundant replica task has been submitted.
FINISHED // schedule is done, remove the tablet from tablet scheduler with status FINISHED
}

Expand Down
54 changes: 40 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ private long getApproximateTimeoutMs() {
* 1. SCHEDULE_FAILED: will keep the tablet RUNNING.
* 2. UNRECOVERABLE: will remove the tablet from runningTablets.
*/
public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
public void finishCloneTask(TabletScheduler scheduler, CloneTask cloneTask, TFinishTaskRequest request)
throws SchedException {
Preconditions.checkState(state == State.RUNNING, state);
Preconditions.checkArgument(cloneTask.getTaskVersion() == CloneTask.VERSION_2);
Expand Down Expand Up @@ -1120,7 +1120,7 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
OlapTable olapTable = (OlapTable) db.getTableOrException(tblId,
s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
"tbl " + tabletId + " does not exist"));
olapTable.writeLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
olapTable.readLockOrException(new SchedException(Status.UNRECOVERABLE, "table "
+ olapTable.getName() + " does not exist"));
try {
Partition partition = olapTable.getPartition(partitionId);
Expand Down Expand Up @@ -1223,29 +1223,55 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
replica.getLastFailedVersion(),
replica.getLastSuccessVersion());

Runnable onFailure = () -> {
scheduler.getStat().counterTabletScheduledFailed.incrementAndGet();
scheduler.finalizeTabletCtx(this, TabletSchedCtx.State.CANCELLED, Status.SCHEDULE_FAILED,
"failed to submit edit log task");
};

if (replica.getState() == ReplicaState.CLONE) {
replica.setState(ReplicaState.NORMAL);
Env.getCurrentEnv().getEditLog().logAddReplica(info);

EditLogExecutor.submit(() -> {
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
scheduler.getStat().counterCloneTaskSucceeded.incrementAndGet();
scheduler.gatherStatistics(this);
scheduler.finalizeTabletCtx(this, TabletSchedCtx.State.FINISHED, Status.FINISHED,
"log delete replica finished");
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {},"
+ " is catchup {}",
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
}, onFailure);
} else {
// if in VERSION_INCOMPLETE, replica is not newly created, thus the state is not CLONE
// so we keep it state unchanged, and log update replica
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
EditLogExecutor.submit(() -> {
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
scheduler.getStat().counterCloneTaskSucceeded.incrementAndGet();
scheduler.gatherStatistics(this);
scheduler.finalizeTabletCtx(this, TabletSchedCtx.State.FINISHED, Status.FINISHED,
"log update replica finished");
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {},"
+ " is catchup {}",
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
}, onFailure);
}

state = State.FINISHED;
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {}, is catchup {}",
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
// TabletScheduler::finishCloneTask will handle this exception. If the task status is SUBMITTED,
// no need to run finalizeTabletCtx
throw new SchedException(Status.SUBMITTED, "edit log task is submitted");
} finally {
olapTable.writeUnlock();
}
olapTable.readUnlock();

if (request.isSetCopySize()) {
this.copySize = request.getCopySize();
}
if (request.isSetCopySize()) {
this.copySize = request.getCopySize();
}

if (request.isSetCopyTimeMs()) {
this.copyTimeMs = request.getCopyTimeMs();
if (request.isSetCopyTimeMs()) {
this.copyTimeMs = request.getCopyTimeMs();
}
}

}

public boolean isTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,18 +445,24 @@ private void schedulePendingTablets() {
stat.counterTabletScheduledFailed.incrementAndGet();
addBackToPendingTablets(tabletCtx);
}
continue;
} else if (e.getStatus() == Status.FINISHED) {
// schedule redundant tablet or scheduler disabled will throw this exception
stat.counterTabletScheduledSucceeded.incrementAndGet();
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getStatus(), e.getMessage());
continue;
} else if (e.getStatus() == Status.SUBMITTED) {
// The SUBMITTED status will only occur when scheduling tablets that are of
// the REDUNDANT or FORCE_REDUNDANT.
continue;
} else {
Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus());
// discard
stat.counterTabletScheduledDiscard.incrementAndGet();
tabletCtx.setSchedFailedCode(e.getSubCode());
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
continue;
}
continue;
} catch (Exception e) {
LOG.warn("got unexpected exception, discard this schedule. tablet: {}",
tabletCtx.getTabletId(), e);
Expand Down Expand Up @@ -524,7 +530,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
OlapTable tbl = (OlapTable) db.getTableOrException(tabletCtx.getTblId(),
s -> new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE,
"tbl " + tabletCtx.getTblId() + " does not exist"));
tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE,
tbl.readLockOrException(new SchedException(Status.UNRECOVERABLE,
"table " + tbl.getName() + " does not exist"));
try {
long tabletId = tabletCtx.getTabletId();
Expand Down Expand Up @@ -649,7 +655,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)

handleTabletByTypeAndStatus(tabletHealth.status, tabletCtx, batchTask);
} finally {
tbl.writeUnlock();
tbl.readUnlock();
}
}

Expand Down Expand Up @@ -896,9 +902,9 @@ private void handleRedundantReplica(TabletSchedCtx tabletCtx, boolean force) thr
|| deleteReplicaOnUrgentHighDisk(tabletCtx, force)
|| deleteFromScaleInDropReplicas(tabletCtx, force)
|| deleteReplicaOnHighLoadBackend(tabletCtx, force)) {
// if we delete at least one redundant replica, we still throw a SchedException with status FINISHED
// to remove this tablet from the pendingTablets(consider it as finished)
throw new SchedException(Status.FINISHED, "redundant replica is deleted");
// if we will delete at least one redundant replica, an async task will be created to handle it.
// we need to throw a SchedException with status SUBMITTED to indicate the task has been submitted.
throw new SchedException(Status.SUBMITTED, "redundant replica task is submitted");
}
throw new SchedException(Status.UNRECOVERABLE, "unable to delete any redundant replicas");
}
Expand Down Expand Up @@ -1154,7 +1160,7 @@ private boolean handleColocateRedundant(TabletSchedCtx tabletCtx) throws SchedEx

// If the replica is not in ColocateBackendsSet or is bad, delete it.
deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false);
throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted");
throw new SchedException(Status.SUBMITTED, "delete colocate redundant replica task is submitted");
}
throw new SchedException(Status.UNRECOVERABLE, "unable to delete any colocate redundant replicas");
}
Expand Down Expand Up @@ -1284,10 +1290,18 @@ private void deleteReplicaInternal(TabletSchedCtx tabletCtx,
tabletCtx.getTabletId(),
beId);

Env.getCurrentEnv().getEditLog().logDeleteReplica(info);

LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}",
tabletCtx.getTabletId(), beId, reason, force);
EditLogExecutor.submit(() -> {
Env.getCurrentEnv().getEditLog().logDeleteReplica(info);
LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}",
tabletCtx.getTabletId(), beId, reason, force);
stat.counterTabletScheduledSucceeded.incrementAndGet();
gatherStatistics(tabletCtx);
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "write edit log finished");
}, () -> {
stat.counterTabletScheduledFailed.incrementAndGet();
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.SCHEDULE_FAILED,
"failed to write edit log");
});
}

private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) {
Expand Down Expand Up @@ -1638,7 +1652,7 @@ private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
addTablet(tabletCtx, true /* force */);
}

private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, Status status, String reason) {
if (state == TabletSchedCtx.State.CANCELLED || state == TabletSchedCtx.State.UNEXPECTED) {
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE
&& tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
Expand Down Expand Up @@ -1866,7 +1880,7 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)

Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
try {
tabletCtx.finishCloneTask(cloneTask, request);
tabletCtx.finishCloneTask(this, cloneTask, request);
} catch (SchedException e) {
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.RUNNING_FAILED) {
Expand All @@ -1890,6 +1904,9 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
stat.counterTabletScheduledDiscard.incrementAndGet();
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
return true;
} else if (e.getStatus() == Status.SUBMITTED) {
// no finalizeTabletCtx
return true;
} else if (e.getStatus() == Status.FINISHED) {
// tablet is already healthy, just remove
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getStatus(), e.getMessage());
Expand All @@ -1915,7 +1932,7 @@ public boolean finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
* It will be evaluated for future strategy.
* This should only be called when the tablet is down with state FINISHED.
*/
private void gatherStatistics(TabletSchedCtx tabletCtx) {
void gatherStatistics(TabletSchedCtx tabletCtx) {
if (tabletCtx.getCopySize() > 0 && tabletCtx.getCopyTimeMs() > 0) {
if (tabletCtx.getSrcBackendId() != -1 && tabletCtx.getSrcPathHash() != -1) {
PathSlot pathSlot = backendsWorkingSlots.get(tabletCtx.getSrcBackendId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testPartitionRebalancer() {
TFinishTaskRequest fakeReq = new TFinishTaskRequest();
fakeReq.task_status = new TStatus(TStatusCode.OK);
fakeReq.finish_tablet_infos = Lists.newArrayList(new TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
tabletSchedCtx.finishCloneTask(tabletScheduler, (CloneTask) task, fakeReq);
} catch (SchedException e) {
e.printStackTrace();
}
Expand Down
Loading