Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@Getter
@Setter
@SerializedName("fr")
protected FailureReason failureReason;
protected volatile FailureReason failureReason;
@Getter
@Setter
protected long latestAutoResumeTimestamp;
Expand Down Expand Up @@ -505,7 +505,7 @@ public List<AbstractStreamingTask> queryAllStreamTasks() {
return tasks;
}

protected void fetchMeta() {
protected void fetchMeta() throws JobException {
try {
if (tvfType != null) {
if (originTvfProps == null) {
Expand All @@ -516,10 +516,18 @@ protected void fetchMeta() {
offsetProvider.fetchRemoteMeta(new HashMap<>());
}
} catch (Exception ex) {
//todo: The job status = MANUAL_PAUSE_ERR, No need to set failureReason again
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to fetch meta, " + ex.getMessage());
if (this.getFailureReason() == null
|| !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
// When a job is manually paused, it does not need to be set again,
// otherwise, it may be woken up by auto resume.
this.setFailureReason(
new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to fetch meta, " + ex.getMessage()));
// If fetching meta fails, the job is paused
// and auto resume will automatically wake it up.
this.updateJobStatus(JobStatus.PAUSED);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.job.cdc.request.JobBaseConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
Expand Down Expand Up @@ -116,8 +117,8 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
} catch (Throwable t) {
throw new CdcClientException(
String.format(
"Fail to get or create slot for global stream split, the slot name is %s. Due to: ",
postgresDialect.getSlotName()),
"Fail to get or create slot, the slot name is %s. Due to: %s ",
postgresDialect.getSlotName(), ExceptionUtils.getRootCauseMessage(t)),
t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,23 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern
sql """INSERT INTO ${mysqlDb}.${tableName} (name,age) VALUES ('DorisTestPriv',28);"""
}

sleep(20000)

def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") where Name='${jobName}'"""
log.info("jobErrorMsg: " + jobErrorMsg)
assert jobErrorMsg.get(0).get(0).contains("Failed to fetch meta")
try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobStatus: " + jobStatus)
// check job status
jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
}
)
} catch (Exception ex){
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex;
}

// grant binlog priv to mysqluser
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
// check job running
try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(3, SECONDS).until(
.pollInterval(1, SECONDS).until(
{
def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobStatus: " + jobStatus)
// check job status
jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
}
)
} catch (Exception ex){
Expand Down
Loading