2929
3030import java .io .File ;
3131import java .util .Optional ;
32+ import java .util .stream .Stream ;
3233import javax .annotation .Nullable ;
3334
34-
3535/**
3636 * Base configuration for HTTP Source and HTTP Sink
3737 */
@@ -48,6 +48,8 @@ public abstract class BaseHttpConfig extends ReferencePluginConfig {
4848 public static final String PROPERTY_PROXY_URL = "proxyUrl" ;
4949 public static final String PROPERTY_PROXY_USERNAME = "proxyUsername" ;
5050 public static final String PROPERTY_PROXY_PASSWORD = "proxyPassword" ;
51+ public static final String PROPERTY_OAUTH2_GRANT_TYPE = "oauth2GrantType" ;
52+ public static final String PROPERTY_OAUTH2_CLIENT_AUTHENTICATION = "oauth2ClientAuthentication" ;
5153
5254 public static final String PROPERTY_AUTH_TYPE_LABEL = "Auth type" ;
5355
@@ -93,6 +95,18 @@ public abstract class BaseHttpConfig extends ReferencePluginConfig {
9395 @ Macro
9496 protected String authUrl ;
9597
98+ @ Nullable
99+ @ Name (PROPERTY_OAUTH2_GRANT_TYPE )
100+ @ Description ("Which Oauth2 grant type flow is used." )
101+ @ Macro
102+ protected String oauth2GrantType ;
103+
104+ @ Nullable
105+ @ Name (PROPERTY_OAUTH2_CLIENT_AUTHENTICATION )
106+ @ Description ("Send auth credentials in the request body or as query param." )
107+ @ Macro
108+ protected String oauth2ClientAuthentication ;
109+
96110 @ Nullable
97111 @ Name (PROPERTY_TOKEN_URL )
98112 @ Description ("Endpoint for the resource server, which exchanges the authorization code for an access token." )
@@ -208,6 +222,19 @@ public String getOAuth2Enabled() {
208222 return oauth2Enabled ;
209223 }
210224
225+ public OAuth2GrantType getOauth2GrantType () {
226+ OAuth2GrantType grantType = OAuth2GrantType .getGrantType (oauth2GrantType );
227+ return getEnumValueByString (OAuth2GrantType .class , grantType .getValue (),
228+ PROPERTY_OAUTH2_GRANT_TYPE );
229+ }
230+
231+ public OAuth2ClientAuthentication getOauth2ClientAuthentication () {
232+ OAuth2ClientAuthentication clientAuthentication = OAuth2ClientAuthentication .getClientAuthentication (
233+ oauth2ClientAuthentication );
234+ return getEnumValueByString (OAuth2ClientAuthentication .class ,
235+ clientAuthentication .getValue (), PROPERTY_OAUTH2_CLIENT_AUTHENTICATION );
236+ }
237+
211238 @ Nullable
212239 public String getAuthUrl () {
213240 return authUrl ;
@@ -359,19 +386,7 @@ public void validate(FailureCollector failureCollector) {
359386 AuthType authType = getAuthType ();
360387 switch (authType ) {
361388 case OAUTH2 :
362- String reasonOauth2 = "OAuth2 is enabled" ;
363- if (!containsMacro (PROPERTY_TOKEN_URL )) {
364- assertIsSet (getTokenUrl (), PROPERTY_TOKEN_URL , reasonOauth2 );
365- }
366- if (!containsMacro (PROPERTY_CLIENT_ID )) {
367- assertIsSet (getClientId (), PROPERTY_CLIENT_ID , reasonOauth2 );
368- }
369- if (!containsMacro ((PROPERTY_CLIENT_SECRET ))) {
370- assertIsSet (getClientSecret (), PROPERTY_CLIENT_SECRET , reasonOauth2 );
371- }
372- if (!containsMacro (PROPERTY_REFRESH_TOKEN )) {
373- assertIsSet (getRefreshToken (), PROPERTY_REFRESH_TOKEN , reasonOauth2 );
374- }
389+ validateOAuth2Fields (failureCollector );
375390 break ;
376391 case SERVICE_ACCOUNT :
377392 String reasonSA = "Service Account is enabled" ;
@@ -405,4 +420,65 @@ public static void assertIsSet(Object propertyValue, String propertyName, String
405420 String .format ("Property '%s' must be set, since %s" , propertyName , reason ), propertyName );
406421 }
407422 }
423+
424+ public static void assertIsSetWithFailureCollector (Object propertyValue , String propertyName , String reason ,
425+ FailureCollector failureCollector ) {
426+ if (propertyValue == null ) {
427+ failureCollector .addFailure (String .format ("Property '%s' must be set, since %s" , propertyName , reason ),
428+ null ).withConfigProperty (propertyName );
429+ }
430+ }
431+
432+ private void validateOAuth2Fields (FailureCollector failureCollector ) {
433+ String reasonOauth2GrantType = String .format ("OAuth2 is enabled and grant type is %s." ,
434+ getOauth2GrantType ().getValue ());
435+ if (!containsMacro (PROPERTY_TOKEN_URL )) {
436+ assertIsSetWithFailureCollector (getTokenUrl (), PROPERTY_TOKEN_URL ,
437+ reasonOauth2GrantType , failureCollector );
438+ }
439+ if (!containsMacro (PROPERTY_CLIENT_ID )) {
440+ assertIsSetWithFailureCollector (getClientId (), PROPERTY_CLIENT_ID ,
441+ reasonOauth2GrantType , failureCollector );
442+ }
443+ if (!containsMacro (PROPERTY_CLIENT_SECRET )) {
444+ assertIsSetWithFailureCollector (getClientSecret (), PROPERTY_CLIENT_SECRET ,
445+ reasonOauth2GrantType , failureCollector );
446+ }
447+ if (!containsMacro (PROPERTY_OAUTH2_CLIENT_AUTHENTICATION )) {
448+ assertIsSetWithFailureCollector (getOauth2ClientAuthentication (),
449+ PROPERTY_OAUTH2_CLIENT_AUTHENTICATION , reasonOauth2GrantType , failureCollector );
450+ }
451+ // in case of refresh token grant type, also check additional fields
452+ if (OAuth2GrantType .REFRESH_TOKEN .equals (getOauth2GrantType ())) {
453+ if (!containsMacro (PROPERTY_REFRESH_TOKEN )) {
454+ assertIsSetWithFailureCollector (getRefreshToken (), PROPERTY_REFRESH_TOKEN ,
455+ reasonOauth2GrantType , failureCollector );
456+ }
457+ }
458+ failureCollector .getOrThrowException ();
459+ }
460+
461+ /**
462+ * Retrieves the corresponding enum constant of a given string value.
463+ *
464+ * <p>This method takes an enum class that implements {@code EnumWithValue} and searches for an
465+ * enum constant that matches the provided string value. If no matching value is found, it throws
466+ * an {@code InvalidConfigPropertyException}.</p>
467+ *
468+ * @param <T> the type of enum that implements {@code EnumWithValue}
469+ * @param enumClass the class of the enum to search within
470+ * @param stringValue the string representation of the enum value
471+ * @param propertyName the name of the property (used for error messages)
472+ * @return the corresponding enum constant if a match is found
473+ * @throws InvalidConfigPropertyException if the string value does not match any enum constant
474+ */
475+ public static <T extends EnumWithValue > T
476+ getEnumValueByString (Class <T > enumClass , String stringValue , String propertyName ) {
477+ return Stream .of (enumClass .getEnumConstants ())
478+ .filter (keyType -> keyType .getValue ().equalsIgnoreCase (stringValue ))
479+ .findAny ()
480+ .orElseThrow (() -> new InvalidConfigPropertyException (
481+ String .format ("Unsupported value for '%s': '%s'" , propertyName , stringValue ),
482+ propertyName ));
483+ }
408484}
0 commit comments