Skip to content

Commit 93ff2fc

Browse files
committed
2 csv pass
1 parent 3c29d14 commit 93ff2fc

File tree

10 files changed

+194
-57
lines changed

10 files changed

+194
-57
lines changed

be/src/pipeline/exec/operator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "pipeline/exec/hashjoin_build_sink.h"
4141
#include "pipeline/exec/hashjoin_probe_operator.h"
4242
#include "pipeline/exec/hive_table_sink_operator.h"
43+
#include "pipeline/exec/tvf_table_sink_operator.h"
4344
#include "pipeline/exec/iceberg_table_sink_operator.h"
4445
#include "pipeline/exec/jdbc_scan_operator.h"
4546
#include "pipeline/exec/jdbc_table_sink_operator.h"
@@ -817,6 +818,7 @@ DECLARE_OPERATOR(ResultFileSinkLocalState)
817818
DECLARE_OPERATOR(OlapTableSinkLocalState)
818819
DECLARE_OPERATOR(OlapTableSinkV2LocalState)
819820
DECLARE_OPERATOR(HiveTableSinkLocalState)
821+
DECLARE_OPERATOR(TVFTableSinkLocalState)
820822
DECLARE_OPERATOR(IcebergTableSinkLocalState)
821823
DECLARE_OPERATOR(AnalyticSinkLocalState)
822824
DECLARE_OPERATOR(BlackholeSinkLocalState)
@@ -936,6 +938,7 @@ template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOp
936938
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;
937939
template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, HiveTableSinkOperatorX>;
938940
template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter, IcebergTableSinkOperatorX>;
941+
template class AsyncWriterSink<doris::vectorized::VTVFTableWriter, TVFTableSinkOperatorX>;
939942

940943
#ifdef BE_TEST
941944
template class OperatorX<DummyOperatorLocalState>;

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
#include "pipeline/exec/hashjoin_build_sink.h"
6363
#include "pipeline/exec/hashjoin_probe_operator.h"
6464
#include "pipeline/exec/hive_table_sink_operator.h"
65-
#include "pipeline/exec/tvf_table_sink_operator.h"
6665
#include "pipeline/exec/iceberg_table_sink_operator.h"
6766
#include "pipeline/exec/jdbc_scan_operator.h"
6867
#include "pipeline/exec/jdbc_table_sink_operator.h"
@@ -101,6 +100,7 @@
101100
#include "pipeline/exec/spill_sort_source_operator.h"
102101
#include "pipeline/exec/streaming_aggregation_operator.h"
103102
#include "pipeline/exec/table_function_operator.h"
103+
#include "pipeline/exec/tvf_table_sink_operator.h"
104104
#include "pipeline/exec/union_sink_operator.h"
105105
#include "pipeline/exec/union_source_operator.h"
106106
#include "pipeline/local_exchange/local_exchange_sink_operator.h"

be/src/vec/sink/writer/vtvf_table_writer.cpp

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,63 @@ Status VTVFTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
5353
_writer_close_timer = ADD_TIMER(writer_profile, "FileWriterCloseTime");
5454

5555
_file_path = _tvf_sink.file_path;
56-
_max_file_size_bytes = _tvf_sink.__isset.max_file_size_bytes ? _tvf_sink.max_file_size_bytes : 0;
56+
_max_file_size_bytes =
57+
_tvf_sink.__isset.max_file_size_bytes ? _tvf_sink.max_file_size_bytes : 0;
5758
_delete_existing_files_flag =
5859
_tvf_sink.__isset.delete_existing_files ? _tvf_sink.delete_existing_files : true;
5960

61+
// Log all parameters received from FE
62+
LOG(INFO) << "TVF table writer open, query_id=" << print_id(_state->query_id())
63+
<< ", tvf_name=" << _tvf_sink.tvf_name
64+
<< ", file_path=" << _tvf_sink.file_path
65+
<< ", file_format=" << _tvf_sink.file_format
66+
<< ", file_type=" << _tvf_sink.file_type
67+
<< ", max_file_size_bytes=" << _max_file_size_bytes
68+
<< ", delete_existing_files=" << _delete_existing_files_flag;
69+
70+
if (_tvf_sink.__isset.column_separator) {
71+
LOG(INFO) << "TVF table writer column_separator=[" << _tvf_sink.column_separator << "]";
72+
}
73+
if (_tvf_sink.__isset.line_delimiter) {
74+
LOG(INFO) << "TVF table writer line_delimiter=[" << _tvf_sink.line_delimiter << "]";
75+
}
76+
if (_tvf_sink.__isset.compression_type) {
77+
LOG(INFO) << "TVF table writer compression_type=" << _tvf_sink.compression_type;
78+
}
79+
if (_tvf_sink.__isset.backend_id) {
80+
LOG(INFO) << "TVF table writer backend_id=" << _tvf_sink.backend_id;
81+
}
82+
83+
// Log columns info
84+
if (_tvf_sink.__isset.columns) {
85+
LOG(INFO) << "TVF table writer columns count=" << _tvf_sink.columns.size();
86+
for (size_t i = 0; i < _tvf_sink.columns.size(); i++) {
87+
const auto& col = _tvf_sink.columns[i];
88+
LOG(INFO) << "TVF table writer column[" << i << "]: name=" << col.column_name
89+
<< ", type=" << col.column_type;
90+
}
91+
} else {
92+
LOG(INFO) << "TVF table writer: no columns info from FE";
93+
}
94+
95+
// Log properties
96+
if (_tvf_sink.__isset.properties) {
97+
LOG(INFO) << "TVF table writer properties count=" << _tvf_sink.properties.size();
98+
for (const auto& kv : _tvf_sink.properties) {
99+
LOG(INFO) << "TVF table writer property: " << kv.first << "=" << kv.second;
100+
}
101+
} else {
102+
LOG(INFO) << "TVF table writer: no properties from FE";
103+
}
104+
105+
// Log hadoop config
106+
if (_tvf_sink.__isset.hadoop_config) {
107+
LOG(INFO) << "TVF table writer hadoop_config count=" << _tvf_sink.hadoop_config.size();
108+
for (const auto& kv : _tvf_sink.hadoop_config) {
109+
LOG(INFO) << "TVF table writer hadoop_config: " << kv.first << "=" << kv.second;
110+
}
111+
}
112+
60113
// Delete existing files if requested
61114
if (_delete_existing_files_flag) {
62115
RETURN_IF_ERROR(_delete_existing_files());
@@ -66,6 +119,19 @@ Status VTVFTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
66119
}
67120

68121
Status VTVFTableWriter::write(RuntimeState* state, vectorized::Block& block) {
122+
if (block.rows() > 0 && _written_rows_counter->value() == 0) {
123+
// Log first block structure for debugging
124+
LOG(INFO) << "TVF table writer first block: rows=" << block.rows()
125+
<< ", columns=" << block.columns()
126+
<< ", query_id=" << print_id(_state->query_id());
127+
for (size_t i = 0; i < block.columns(); i++) {
128+
const auto& col_with_name = block.get_by_position(i);
129+
LOG(INFO) << "TVF table writer block column[" << i << "]: name=" << col_with_name.name
130+
<< ", type=" << col_with_name.type->get_name()
131+
<< ", rows=" << col_with_name.column->size();
132+
}
133+
}
134+
69135
COUNTER_UPDATE(_written_rows_counter, block.rows());
70136

71137
{
@@ -99,31 +165,38 @@ Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
99165
properties = _tvf_sink.properties;
100166
}
101167

102-
_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
103-
file_type, _state->exec_env(), {}, properties, file_name,
104-
{
105-
.write_file_cache = false,
106-
.sync_file_data = false,
107-
}));
168+
LOG(INFO) << "TVF table writer creating file writer: file_name=" << file_name
169+
<< ", file_type=" << file_type
170+
<< ", properties_count=" << properties.size()
171+
<< ", output_expr_count=" << _vec_output_expr_ctxs.size();
172+
173+
_file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(file_type, _state->exec_env(), {},
174+
properties, file_name,
175+
{
176+
.write_file_cache = false,
177+
.sync_file_data = false,
178+
}));
108179

109180
TFileFormatType::type format = _tvf_sink.file_format;
110181
switch (format) {
111182
case TFileFormatType::FORMAT_CSV_PLAIN: {
112-
std::string column_separator =
183+
_column_separator =
113184
_tvf_sink.__isset.column_separator ? _tvf_sink.column_separator : ",";
114-
std::string line_delimiter =
185+
_line_delimiter =
115186
_tvf_sink.__isset.line_delimiter ? _tvf_sink.line_delimiter : "\n";
116187
TFileCompressType::type compress_type = TFileCompressType::PLAIN;
117188
if (_tvf_sink.__isset.compression_type) {
118189
compress_type = _tvf_sink.compression_type;
119190
}
120-
_vfile_writer.reset(new VCSVTransformer(_state, _file_writer_impl.get(),
121-
_vec_output_expr_ctxs, false, {}, {}, column_separator,
122-
line_delimiter, false, compress_type));
191+
LOG(INFO) << "TVF table writer CSV config: column_separator=[" << _column_separator
192+
<< "], line_delimiter=[" << _line_delimiter
193+
<< "], compress_type=" << compress_type;
194+
_vfile_writer.reset(new VCSVTransformer(
195+
_state, _file_writer_impl.get(), _vec_output_expr_ctxs, false, {}, {},
196+
_column_separator, _line_delimiter, false, compress_type));
123197
break;
124198
}
125199
case TFileFormatType::FORMAT_PARQUET: {
126-
// Build parquet schemas from columns
127200
std::vector<TParquetSchema> parquet_schemas;
128201
if (_tvf_sink.__isset.columns) {
129202
for (const auto& col : _tvf_sink.columns) {
@@ -132,6 +205,7 @@ Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
132205
parquet_schemas.push_back(schema);
133206
}
134207
}
208+
LOG(INFO) << "TVF table writer Parquet config: schema_count=" << parquet_schemas.size();
135209
_vfile_writer.reset(new VParquetTransformer(
136210
_state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas, false,
137211
{TParquetCompressionType::SNAPPY, TParquetVersion::PARQUET_1_0, false, false}));
@@ -142,6 +216,7 @@ Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
142216
if (_tvf_sink.__isset.compression_type) {
143217
compress_type = _tvf_sink.compression_type;
144218
}
219+
LOG(INFO) << "TVF table writer ORC config: compress_type=" << compress_type;
145220
_vfile_writer.reset(new VOrcTransformer(_state, _file_writer_impl.get(),
146221
_vec_output_expr_ctxs, "", {}, false,
147222
compress_type));
@@ -151,8 +226,7 @@ Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
151226
return Status::InternalError("Unsupported TVF sink file format: {}", format);
152227
}
153228

154-
LOG(INFO) << "TVF table writer created file: " << file_name
155-
<< ", format: " << format
229+
LOG(INFO) << "TVF table writer created file: " << file_name << ", format: " << format
156230
<< ", query_id: " << print_id(_state->query_id());
157231

158232
return _vfile_writer->open();

be/src/vec/sink/writer/vtvf_table_writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class VTVFTableWriter final : public AsyncResultWriter {
7777
int _file_idx = 0;
7878
bool _delete_existing_files_flag = true;
7979
std::string _file_path;
80+
std::string _column_separator;
81+
std::string _line_delimiter;
8082

8183
// profile counters
8284
RuntimeProfile::Counter* _written_rows_counter = nullptr;

fe/.idea/vcs.xml

Lines changed: 6 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ public PlanFragment visitPhysicalTVFTableSink(PhysicalTVFTableSink<? extends Pla
465465
PlanFragment rootFragment = tvfSink.child().accept(this, context);
466466
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
467467
TVFTableSink sink = new TVFTableSink(
468+
rootFragment.getPlanRoot().getId(),
468469
tvfSink.getTvfName(), tvfSink.getProperties(), tvfSink.getCols());
469470
try {
470471
sink.bindDataSink();

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@
9797
import org.apache.doris.qe.AutoCloseSessionVariable;
9898
import org.apache.doris.qe.ConnectContext;
9999
import org.apache.doris.qe.SessionVariable;
100+
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
101+
import org.apache.doris.tablefunction.HdfsTableValuedFunction;
102+
import org.apache.doris.tablefunction.LocalTableValuedFunction;
103+
import org.apache.doris.tablefunction.S3TableValuedFunction;
100104
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
101105

102106
import com.google.common.base.Preconditions;
@@ -108,6 +112,8 @@
108112
import org.apache.iceberg.PartitionField;
109113
import org.apache.iceberg.PartitionSpec;
110114
import org.apache.iceberg.Table;
115+
import org.apache.logging.log4j.LogManager;
116+
import org.apache.logging.log4j.Logger;
111117

112118
import java.util.ArrayList;
113119
import java.util.HashMap;
@@ -121,6 +127,8 @@
121127
* bind an unbound logicalTableSink represent the target table of an insert command
122128
*/
123129
public class BindSink implements AnalysisRuleFactory {
130+
private static final Logger LOG = LogManager.getLogger(BindSink.class);
131+
124132
public boolean needTruncateStringWhenInsert;
125133

126134
public BindSink() {
@@ -583,17 +591,79 @@ private Plan bindTVFTableSink(MatchingContext<UnboundTVFTableSink<Plan>> ctx) {
583591

584592
LogicalPlan child = ((LogicalPlan) sink.child());
585593

586-
// Derive columns from child query output
587-
List<Column> cols = child.getOutput().stream()
588-
.map(slot -> new Column(slot.getName(), slot.getDataType().toCatalogDataType()))
589-
.collect(ImmutableList.toImmutableList());
594+
// Determine target schema: if append mode and file exists, use existing file schema;
595+
// otherwise derive from child query output.
596+
boolean deleteExisting = Boolean.parseBoolean(
597+
properties.getOrDefault("delete_existing_files", "true"));
598+
List<Column> cols = null;
599+
if (!deleteExisting) {
600+
cols = tryGetExistingFileSchema(tvfName, properties);
601+
}
602+
if (cols == null) {
603+
cols = child.getOutput().stream()
604+
.map(slot -> new Column(slot.getName(), slot.getDataType().toCatalogDataType()))
605+
.collect(ImmutableList.toImmutableList());
606+
}
607+
608+
// Validate column count
609+
if (cols.size() != child.getOutput().size()) {
610+
throw new AnalysisException(
611+
"insert into cols should be corresponding to the query output"
612+
+ ", target columns: " + cols.size()
613+
+ ", query output: " + child.getOutput().size());
614+
}
615+
616+
// Build columnToOutput mapping and reuse getOutputProjectByCoercion for type cast,
617+
// same as OlapTable INSERT INTO.
618+
Map<String, NamedExpression> columnToOutput = Maps.newLinkedHashMap();
619+
for (int i = 0; i < cols.size(); i++) {
620+
Column col = cols.get(i);
621+
NamedExpression childExpr = (NamedExpression) child.getOutput().get(i);
622+
Alias output = new Alias(TypeCoercionUtils.castIfNotSameType(
623+
childExpr, DataType.fromCatalogType(col.getType())), col.getName());
624+
columnToOutput.put(col.getName(), output);
625+
}
626+
LogicalProject<?> projectWithCast = getOutputProjectByCoercion(cols, child, columnToOutput);
590627

591-
List<NamedExpression> outputExprs = child.getOutput().stream()
628+
List<NamedExpression> outputExprs = projectWithCast.getOutput().stream()
592629
.map(NamedExpression.class::cast)
593630
.collect(ImmutableList.toImmutableList());
594631

595632
return new LogicalTVFTableSink<>(tvfName, properties, cols, outputExprs,
596-
Optional.empty(), Optional.empty(), child);
633+
Optional.empty(), Optional.empty(), projectWithCast);
634+
}
635+
636+
/**
637+
* Try to instantiate the corresponding TVF to read the existing file's schema.
638+
* Returns null if the file does not exist or schema inference fails.
639+
*/
640+
private List<Column> tryGetExistingFileSchema(String tvfName, Map<String, String> properties) {
641+
try {
642+
ExternalFileTableValuedFunction tvf;
643+
Map<String, String> propsCopy = new HashMap<>(properties);
644+
switch (tvfName) {
645+
case "local":
646+
tvf = new LocalTableValuedFunction(propsCopy);
647+
break;
648+
case "s3":
649+
tvf = new S3TableValuedFunction(propsCopy);
650+
break;
651+
case "hdfs":
652+
tvf = new HdfsTableValuedFunction(propsCopy);
653+
break;
654+
default:
655+
return null;
656+
}
657+
List<Column> columns = tvf.getTableColumns();
658+
if (columns != null && !columns.isEmpty()) {
659+
return columns;
660+
}
661+
} catch (Exception e) {
662+
// File does not exist or schema inference failed — fall back to child query schema
663+
LOG.info("TVF sink: could not read existing file schema for append mode, "
664+
+ "will use child query schema. Reason: " + e.getMessage());
665+
}
666+
return null;
597667
}
598668

599669
private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> ctx) {

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
3333
import org.apache.doris.nereids.analyzer.UnboundRelation;
3434
import org.apache.doris.nereids.analyzer.UnboundResultSink;
35+
import org.apache.doris.nereids.analyzer.UnboundTVFTableSink;
3536
import org.apache.doris.nereids.analyzer.UnboundTableSink;
3637
import org.apache.doris.nereids.exceptions.AnalysisException;
3738
import org.apache.doris.nereids.parser.NereidsParser;
@@ -139,6 +140,10 @@ private Plan collectFromAny(MatchingContext<Plan> ctx) {
139140
}
140141

141142
private Plan collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) {
143+
// TVF sink (local/s3/hdfs) is not a real table, skip table collection
144+
if (ctx.root instanceof UnboundTVFTableSink) {
145+
return null;
146+
}
142147
List<String> nameParts = ctx.root.getNameParts();
143148
switch (nameParts.size()) {
144149
case 1:

0 commit comments

Comments
 (0)