Skip to content

Commit dbfa0d6

Browse files
committed
2
1 parent 4004b68 commit dbfa0d6

File tree

3 files changed

+79
-119
lines changed

3 files changed

+79
-119
lines changed

be/src/vec/sink/writer/vfile_result_writer.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,12 @@ Status VFileResultWriter::_write_file(const Block& block) {
254254
}
255255
// split file if exceed limit
256256
_current_written_bytes = _vfile_writer->written_len();
257+
LOG(INFO) << "yy debug update: " << _vfile_writer->written_len();
257258
return _create_new_file_if_exceed_size();
258259
}
259260

260261
Status VFileResultWriter::_create_new_file_if_exceed_size() {
262+
LOG(INFO) << "yy debug _current_written_bytes: " << _current_written_bytes << ", expect: " << _file_opts->max_file_size_bytes;
261263
if (_current_written_bytes < _file_opts->max_file_size_bytes) {
262264
return Status::OK();
263265
}

regression-test/suites/export_p2/test_export_max_file_size.groovy

Lines changed: 41 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,11 @@ suite("test_export_max_file_size", "p2,external") {
2222
sql """ set enable_nereids_planner=true """
2323
sql """ set enable_fallback_to_original_planner=false """
2424

25-
String dfsNameservices=context.config.otherConfigs.get("dfsNameservices")
26-
String dfsHaNamenodesHdfsCluster=context.config.otherConfigs.get("dfsHaNamenodesHdfsCluster")
27-
String dfsNamenodeRpcAddress1=context.config.otherConfigs.get("dfsNamenodeRpcAddress1")
28-
String dfsNamenodeRpcAddress2=context.config.otherConfigs.get("dfsNamenodeRpcAddress2")
29-
String dfsNamenodeRpcAddress3=context.config.otherConfigs.get("dfsNamenodeRpcAddress3")
30-
String dfsNameservicesPort=context.config.otherConfigs.get("dfsNameservicesPort")
31-
String hadoopSecurityAuthentication =context.config.otherConfigs.get("hadoopSecurityAuthentication")
32-
String hadoopKerberosKeytabPath =context.config.otherConfigs.get("hadoopKerberosKeytabPath")
33-
String hadoopKerberosPrincipal =context.config.otherConfigs.get("hadoopKerberosPrincipal")
34-
String hadoopSecurityAutoToLocal =context.config.otherConfigs.get("hadoopSecurityAutoToLocal")
25+
String ak = getS3AK()
26+
String sk = getS3SK()
27+
String s3_endpoint = getS3Endpoint()
28+
String region = getS3Region()
29+
String bucket = context.config.otherConfigs.get("s3BucketName")
3530

3631
def table_export_name = "test_export_max_file_size"
3732
// create table and insert
@@ -79,24 +74,24 @@ suite("test_export_max_file_size", "p2,external") {
7974
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
8075
"""
8176

82-
def load_data_path = "/user/export_test/exp_max_file_size.csv"
8377
sql """
8478
insert into ${table_export_name}
85-
select * from hdfs(
86-
"uri" = "hdfs://${dfsNameservices}${load_data_path}",
87-
"format" = "csv",
88-
"dfs.data.transfer.protection" = "integrity",
89-
'dfs.nameservices'="${dfsNameservices}",
90-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
91-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
92-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
93-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
94-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
95-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
96-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
97-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
98-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
99-
);
79+
select
80+
number as user_id,
81+
date_add('2024-01-01', interval cast(rand() * 365 as int) day) as date,
82+
date_add('2024-01-01 00:00:00', interval cast(rand() * 365 * 24 * 3600 as int) second) as datetime,
83+
concat('City_', cast(cast(rand() * 100 as int) as string)) as city,
84+
cast(rand() * 80 + 18 as int) as age,
85+
cast(rand() * 2 as int) as sex,
86+
if(rand() > 0.5, true, false) as bool_col,
87+
cast(rand() * 1000000 as int) as int_col,
88+
cast(rand() * 10000000000 as bigint) as bigint_col,
89+
cast(rand() * 100000000000000 as largeint) as largeint_col,
90+
cast(rand() * 1000 as float) as float_col,
91+
rand() * 10000 as double_col,
92+
concat('char_', cast(cast(rand() * 10000 as int) as string)) as char_col,
93+
cast(rand() * 1000 as decimal(10, 2)) as decimal_col
94+
from numbers("number" = "1000000");
10095
"""
10196

10297

@@ -114,32 +109,26 @@ suite("test_export_max_file_size", "p2,external") {
114109
}
115110
}
116111

117-
def outFilePath = """/user/export_test/test_max_file_size/exp_"""
112+
def outFilePath = """${bucket}/export/test_max_file_size/exp_"""
118113

119114
// 1. csv test
120115
def test_export = {format, file_suffix, isDelete ->
121116
def uuid = UUID.randomUUID().toString()
122117
// exec export
123118
sql """
124-
EXPORT TABLE ${table_export_name} TO "hdfs://${dfsNameservices}${outFilePath}"
119+
EXPORT TABLE ${table_export_name} TO "s3://${outFilePath}"
125120
PROPERTIES(
126121
"label" = "${uuid}",
127122
"format" = "${format}",
128123
"max_file_size" = "5MB",
129124
"delete_existing_files"="${isDelete}"
130125
)
131-
with HDFS (
132-
"dfs.data.transfer.protection" = "integrity",
133-
'dfs.nameservices'="${dfsNameservices}",
134-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
135-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
136-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
137-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
138-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
139-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
140-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
141-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
142-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
126+
WITH s3 (
127+
"s3.endpoint" = "${s3_endpoint}",
128+
"s3.region" = "${region}",
129+
"s3.secret_key"="${sk}",
130+
"s3.access_key" = "${ak}",
131+
"provider" = "${getS3Provider()}"
143132
);
144133
"""
145134

@@ -151,20 +140,13 @@ suite("test_export_max_file_size", "p2,external") {
151140

152141
for (int j = 0; j < json.fileNumber[0][0].toInteger(); ++j) {
153142
def res = sql """
154-
select count(*) from hdfs(
155-
"uri" = "${outfile_url}${j}.csv",
143+
select count(*) from s3(
144+
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length())}${j}.csv",
145+
"ACCESS_KEY"= "${ak}",
146+
"SECRET_KEY" = "${sk}",
156147
"format" = "csv",
157-
"dfs.data.transfer.protection" = "integrity",
158-
'dfs.nameservices'="${dfsNameservices}",
159-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
160-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
161-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
162-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
163-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
164-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
165-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
166-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
167-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
148+
"provider" = "${getS3Provider()}",
149+
"region" = "${region}"
168150
);
169151
"""
170152
logger.info("res[0][0] = " + res[0][0]);
@@ -175,20 +157,13 @@ suite("test_export_max_file_size", "p2,external") {
175157
// check data correctness
176158
sql """
177159
insert into ${table_load_name}
178-
select * from hdfs(
179-
"uri" = "${outfile_url}${j}.csv",
160+
select * from s3(
161+
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length())}${j}.csv",
162+
"ACCESS_KEY"= "${ak}",
163+
"SECRET_KEY" = "${sk}",
180164
"format" = "csv",
181-
"dfs.data.transfer.protection" = "integrity",
182-
'dfs.nameservices'="${dfsNameservices}",
183-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
184-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
185-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
186-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
187-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
188-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
189-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
190-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
191-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
165+
"provider" = "${getS3Provider()}",
166+
"region" = "${region}"
192167
);
193168
"""
194169
}

regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy

Lines changed: 36 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,22 @@
1616
// under the License.
1717

1818
suite("test_outfile_orc_max_file_size", "p2,external") {
19-
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
19+
String enabled = "true";
2020
if (enabled != null && enabled.equalsIgnoreCase("true")) {
2121
// open nereids
2222
sql """ set enable_nereids_planner=true """
2323
sql """ set enable_fallback_to_original_planner=false """
2424

25-
26-
String dfsNameservices=context.config.otherConfigs.get("dfsNameservices")
27-
String dfsHaNamenodesHdfsCluster=context.config.otherConfigs.get("dfsHaNamenodesHdfsCluster")
28-
String dfsNamenodeRpcAddress1=context.config.otherConfigs.get("dfsNamenodeRpcAddress1")
29-
String dfsNamenodeRpcAddress2=context.config.otherConfigs.get("dfsNamenodeRpcAddress2")
30-
String dfsNamenodeRpcAddress3=context.config.otherConfigs.get("dfsNamenodeRpcAddress3")
31-
String dfsNameservicesPort=context.config.otherConfigs.get("dfsNameservicesPort")
32-
String hadoopSecurityAuthentication =context.config.otherConfigs.get("hadoopSecurityAuthentication")
33-
String hadoopKerberosKeytabPath =context.config.otherConfigs.get("hadoopKerberosKeytabPath")
34-
String hadoopKerberosPrincipal =context.config.otherConfigs.get("hadoopKerberosPrincipal")
35-
String hadoopSecurityAutoToLocal =context.config.otherConfigs.get("hadoopSecurityAutoToLocal")
25+
String ak = getS3AK()
26+
String sk = getS3SK()
27+
String s3_endpoint = getS3Endpoint()
28+
String region = getS3Region()
29+
String bucket = context.config.otherConfigs.get("s3BucketName")
3630

3731
// the path used to load data
38-
def load_data_path = "/user/export_test/test_orc_max_file_size.orc"
32+
def load_data_path = "export_test/test_orc_max_file_size.orc"
3933
// the path used to export data
40-
def outFilePath = """/user/export_test/test_max_file_size/test_orc/exp_"""
34+
def outFilePath = """${bucket}/export/test_max_file_size/test_orc/exp_"""
4135

4236
def create_table = {table_name ->
4337
sql """ DROP TABLE IF EXISTS ${table_name} """
@@ -69,42 +63,37 @@ suite("test_outfile_orc_max_file_size", "p2,external") {
6963
// load data
7064
sql """
7165
insert into ${table_export_name}
72-
select * from hdfs(
73-
"uri" = "hdfs://${dfsNameservices}${load_data_path}",
74-
"format" = "orc",
75-
"dfs.data.transfer.protection" = "integrity",
76-
'dfs.nameservices'="${dfsNameservices}",
77-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
78-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
79-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
80-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
81-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
82-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
83-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
84-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
85-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
86-
);
66+
select
67+
number as user_id,
68+
date_add('2024-01-01', interval cast(rand() * 365 as int) day) as date,
69+
date_add('2024-01-01 00:00:00', interval cast(rand() * 365 * 24 * 3600 as int) second) as datetime,
70+
concat('City_', cast(cast(rand() * 100 as int) as string)) as city,
71+
cast(rand() * 80 + 18 as int) as age,
72+
cast(rand() * 2 as int) as sex,
73+
if(rand() > 0.5, true, false) as bool_col,
74+
cast(rand() * 1000000 as int) as int_col,
75+
cast(rand() * 10000000000 as bigint) as bigint_col,
76+
cast(rand() * 100000000000000 as largeint) as largeint_col,
77+
cast(rand() * 1000 as float) as float_col,
78+
rand() * 10000 as double_col,
79+
concat('char_', cast(cast(rand() * 10000 as int) as string)) as char_col,
80+
cast(rand() * 1000 as decimal(10, 2)) as decimal_col
81+
from numbers("number" = "2000000");
8782
"""
8883

8984
def test_outfile_orc_success = {maxFileSize, isDelete, fileNumber, totalRows ->
9085
def table = sql """
9186
select * from ${table_export_name}
92-
into outfile "hdfs://${dfsNameservices}${outFilePath}"
87+
into outfile "s3://${outFilePath}"
9388
FORMAT AS ORC
9489
PROPERTIES(
9590
"max_file_size" = "${maxFileSize}",
9691
"delete_existing_files"="${isDelete}",
97-
"dfs.data.transfer.protection" = "integrity",
98-
'dfs.nameservices'="${dfsNameservices}",
99-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
100-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
101-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
102-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
103-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
104-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
105-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
106-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
107-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
92+
"s3.endpoint" = "${s3_endpoint}",
93+
"s3.region" = "${region}",
94+
"s3.secret_key"="${sk}",
95+
"s3.access_key" = "${ak}",
96+
"provider" = "${getS3Provider()}"
10897
);
10998
"""
11099

@@ -120,22 +109,16 @@ suite("test_outfile_orc_max_file_size", "p2,external") {
120109
test {
121110
sql """
122111
select * from ${table_export_name}
123-
into outfile "hdfs://${dfsNameservices}${outFilePath}"
112+
into outfile "s3://${outFilePath}"
124113
FORMAT AS ORC
125114
PROPERTIES(
126115
"max_file_size" = "${maxFileSize}",
127116
"delete_existing_files"="${isDelete}",
128-
"dfs.data.transfer.protection" = "integrity",
129-
'dfs.nameservices'="${dfsNameservices}",
130-
'dfs.ha.namenodes.hdfs-cluster'="${dfsHaNamenodesHdfsCluster}",
131-
'dfs.namenode.rpc-address.hdfs-cluster.nn1'="${dfsNamenodeRpcAddress1}:${dfsNameservicesPort}",
132-
'dfs.namenode.rpc-address.hdfs-cluster.nn2'="${dfsNamenodeRpcAddress2}:${dfsNameservicesPort}",
133-
'dfs.namenode.rpc-address.hdfs-cluster.nn3'="${dfsNamenodeRpcAddress3}:${dfsNameservicesPort}",
134-
'hadoop.security.authentication'="${hadoopSecurityAuthentication}",
135-
'hadoop.kerberos.keytab'="${hadoopKerberosKeytabPath}",
136-
'hadoop.kerberos.principal'="${hadoopKerberosPrincipal}",
137-
'hadoop.security.auth_to_local' = "${hadoopSecurityAutoToLocal}",
138-
'dfs.client.failover.proxy.provider.hdfs-cluster'="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
117+
"s3.endpoint" = "${s3_endpoint}",
118+
"s3.region" = "${region}",
119+
"s3.secret_key"="${sk}",
120+
"s3.access_key" = "${ak}",
121+
"provider" = "${getS3Provider()}"
139122
);
140123
"""
141124

0 commit comments

Comments
 (0)