Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.doris.datasource.property.storage;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.ConnectorPropertiesUtils;
import org.apache.doris.datasource.property.ConnectorProperty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -28,6 +30,8 @@
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -256,6 +260,11 @@ protected void setEndpointIfPossible() {
super.setEndpointIfPossible();
}

@Override
public String validateAndNormalizeUri(String uri) throws UserException {
return super.validateAndNormalizeUri(rewriteOssBucketIfNecessary(uri));
}

@Override
public void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
Expand Down Expand Up @@ -304,4 +313,72 @@ public void initializeHadoopStorageConfig() {
hadoopStorageConfig.set("fs.oss.accessKeySecret", secretKey);
hadoopStorageConfig.set("fs.oss.endpoint", endpoint);
}

/**
* Rewrites the bucket part of an OSS URI if the bucket is specified
* in the form of bucket.endpoint. https://help.aliyun.com/zh/oss/user-guide/access-oss-via-bucket-domain-name
*
* <p>This method is designed for OSS usage, but it also supports
* the {@code s3://} scheme since OSS URIs are sometimes written
* using the S3-style scheme.</p>
*
* <p>HTTP and HTTPS URIs are returned unchanged.</p>
*
* <p>Examples:
* <pre>
* oss://bucket.endpoint/path -> oss://bucket/path
* s3://bucket.endpoint -> s3://bucket
* https://bucket.endpoint -> unchanged
* </pre>
*
* @param uri the original URI string
* @return the rewritten URI string, or the original URI if no rewrite is needed
*/
@VisibleForTesting
protected static String rewriteOssBucketIfNecessary(String uri) {
if (uri == null || uri.isEmpty()) {
return uri;
}

URI parsed;
try {
parsed = URI.create(uri);
} catch (IllegalArgumentException e) {
// Invalid URI, do not rewrite
return uri;
}

String scheme = parsed.getScheme();
if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
return uri;
}

// For non-standard schemes (oss / s3), authority is more reliable than host
String authority = parsed.getAuthority();
if (authority == null || authority.isEmpty()) {
return uri;
}

// Handle bucket.endpoint format
int dotIndex = authority.indexOf('.');
if (dotIndex <= 0) {
return uri;
}

String bucket = authority.substring(0, dotIndex);

try {
URI rewritten = new URI(
scheme,
bucket,
parsed.getPath(),
parsed.getQuery(),
parsed.getFragment()
);
return rewritten.toString();
} catch (URISyntaxException e) {
// Be conservative: fallback to original URI
return uri;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,12 @@ public void testS3DisableHadoopCache() throws UserException {
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false));
}

@Test
public void testOSSBucketEndpointPathProperties() throws UserException {
Assertions.assertEquals("oss://my-bucket/path/to/dir/", OSSProperties.rewriteOssBucketIfNecessary("oss://my-bucket/path/to/dir/"));
Assertions.assertEquals("oss://my-bucket/path/to/dir/file.txt", OSSProperties.rewriteOssBucketIfNecessary("oss://my-bucket.oss-cn-hangzhou.aliyuncs.com/path/to/dir/file.txt"));
Assertions.assertEquals("s3://my-bucket/path/to/dir/file.txt", OSSProperties.rewriteOssBucketIfNecessary("s3://my-bucket.oss-cn-hangzhou.aliyuncs.com/path/to/dir/file.txt"));
Assertions.assertEquals("https://bucket-name.oss-cn-hangzhou.aliyuncs.com/path/to/dir/file.txt", OSSProperties.rewriteOssBucketIfNecessary("https://bucket-name.oss-cn-hangzhou.aliyuncs.com/path/to/dir/file.txt"));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ suite("test_s3_tvf_s3_storage", "p0,external,external_docker") {
s3_tvf("http://${bucket}.${s3_endpoint}", "", "s3.access_key", "s3.secret_key", "region", "false");
s3_tvf("http://${bucket}.${s3_endpoint}", "", "AWS_ACCESS_KEY", "AWS_SECRET_KEY", "region", "false");
s3_tvf("http://${bucket}.${s3_endpoint}", "", "s3.access_key", "s3.secret_key", "s3.region", "false");
s3_tvf("oss://${bucket}.${s3_endpoint}", "", "s3.access_key", "s3.secret_key", "s3.region", "false");
s3_tvf("s3://${bucket}.${s3_endpoint}", "", "s3.access_key", "s3.secret_key", "s3.region", "false");
shouldFail {
// it's OSS
s3_tvf("http://${bucket}.${s3_endpoint}", "", "s3.access_key", "cos.secret_key", "region", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
String oss_endpoint = context.config.otherConfigs.get("aliYunEndpoint")
String bucket = context.config.otherConfigs.get("aliYunBucket")
String oss_parent_path = "${bucket}/refactor-test"
String oss_bucket_endpoint_parent_path="${bucket}.${oss_endpoint}/refactor-test"
String oss_region = context.config.otherConfigs.get("aliYunRegion")
String oss_region_param = """
'oss.region' = '${oss_region}',
Expand Down Expand Up @@ -514,6 +515,13 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
//OSS - Insert overwrite tests
db_location = "oss://${oss_parent_path}/hive/hms/overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_properties + oss_storage_properties, "hive_hms_oss_overwrite_test", db_location)
//OSS - Partition table tests (fix for partition path scheme mismatch)
db_location = "oss://${oss_bucket_endpoint_parent_path}/hive/hms/bucket_endpoint/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + oss_storage_properties, "hive_hms_oss_partition_test", db_location)
testPartitionTableInsert(hms_properties + oss_region_param + oss_storage_properties, "hive_hms_oss_bucket_endpoint_partition_test_region", db_location)
//OSS - Insert overwrite tests
db_location = "oss://${oss_bucket_endpoint_parent_path}/hive/hms/bucket_endpoint/overwrite/" + System.currentTimeMillis()
testInsertOverwrite(hms_properties + oss_storage_properties, "hive_hms_oss_bucket_endpoint_overwrite_test", db_location)

//s3
db_location = "s3a://${s3_parent_path}/hive/hms/"+System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property
String oss_endpoint = context.config.otherConfigs.get("aliYunEndpoint")
String oss_bucket = context.config.otherConfigs.get("aliYunBucket")
String oss_parent_path = "${oss_bucket}/refactor-test"
String oss_bucket_endpoint_parent_path="${oss_bucket}.${oss_endpoint}/refactor-test"
String oss_region = context.config.otherConfigs.get("aliYunRegion")
String oss_region_param = """
'oss.region' = '${oss_region}',
Expand Down Expand Up @@ -363,6 +364,10 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property
testQueryAndInsert(iceberg_hms_type_prop + hms_kerberos_old_prop + warehouse + oss_storage_properties, "iceberg_hms_on_oss_kerberos_old")
//new kerberos
testQueryAndInsert(iceberg_hms_type_prop + hms_kerberos_new_prop + warehouse + oss_storage_properties, "iceberg_hms_on_oss_kerberos_new")
warehouse """
'warehouse' = 'oss://${oss_bucket_endpoint_parent_path}/iceberg-hms-warehouse',
"""
testQueryAndInsert(iceberg_hms_type_prop + hms_prop + warehouse + oss_region_param + oss_storage_properties, "iceberg_hms_on_oss")

/*--------HMS on OBS-----------*/
warehouse = """
Expand Down Expand Up @@ -490,6 +495,12 @@ suite("iceberg_on_hms_and_filesystem_and_dlf", "p2,external,new_catalog_property
"""
testQueryAndInsert(iceberg_file_system_catalog_properties + warehouse + oss_storage_properties, "iceberg_fs_on_oss")
testQueryAndInsert(iceberg_file_system_catalog_properties + warehouse + oss_region_param + oss_storage_properties, "iceberg_fs_on_oss_region")

warehouse = """
'warehouse' = 'oss://${oss_bucket_endpoint_parent_path}/iceberg-fs-oss-warehouse',
"""
testQueryAndInsert(iceberg_file_system_catalog_properties + warehouse + oss_region_param + oss_storage_properties, "iceberg_fs_on_oss_region")

/** HDFS **/
warehouse = """
'warehouse' = '${hdfs_parent_path}/iceberg-fs-hdfs-warehouse',
Expand Down