Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/elasticsearch-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<dependencies>
<dependency>
<groupId>org.hibernate.search</groupId>
<artifactId>hibernate-search-backend-elasticsearch</artifactId>
<artifactId>hibernate-search-backend-elasticsearch-client-common</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import org.hibernate.search.backend.elasticsearch.aws.cfg.ElasticsearchAwsCredentialsTypeNames;
import org.hibernate.search.backend.elasticsearch.aws.spi.ElasticsearchAwsCredentialsProvider;
import org.hibernate.search.backend.elasticsearch.client.ElasticsearchHttpClientConfigurer;
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequestInterceptorProvider;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.bean.spi.BeanConfigurationContext;
import org.hibernate.search.engine.environment.bean.spi.BeanConfigurer;
Expand All @@ -17,8 +17,8 @@ public class ElasticsearchAwsBeanConfigurer implements BeanConfigurer {
@Override
public void configure(BeanConfigurationContext context) {
context.define(
ElasticsearchHttpClientConfigurer.class,
beanResolver -> BeanHolder.of( new ElasticsearchAwsHttpClientConfigurer() )
ElasticsearchRequestInterceptorProvider.class,
beanResolver -> BeanHolder.of( new ElasticsearchAwsSigningInterceptorProvider() )
);
context.define(
ElasticsearchAwsCredentialsProvider.class, ElasticsearchAwsCredentialsTypeNames.DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
*/
package org.hibernate.search.backend.elasticsearch.aws.impl;

import org.hibernate.search.backend.elasticsearch.ElasticsearchDistributionName;
import org.hibernate.search.backend.elasticsearch.ElasticsearchVersion;
import java.util.Locale;
import java.util.Optional;
import java.util.regex.Pattern;

import org.hibernate.search.backend.elasticsearch.aws.cfg.ElasticsearchAwsBackendSettings;
import org.hibernate.search.backend.elasticsearch.aws.cfg.ElasticsearchAwsCredentialsTypeNames;
import org.hibernate.search.backend.elasticsearch.aws.logging.impl.AwsLog;
import org.hibernate.search.backend.elasticsearch.aws.spi.ElasticsearchAwsCredentialsProvider;
import org.hibernate.search.backend.elasticsearch.client.ElasticsearchHttpClientConfigurationContext;
import org.hibernate.search.backend.elasticsearch.client.ElasticsearchHttpClientConfigurer;
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequestInterceptor;
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequestInterceptorProvider;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.cfg.spi.OptionalConfigurationProperty;
Expand All @@ -22,8 +24,8 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;

public class ElasticsearchAwsHttpClientConfigurer implements ElasticsearchHttpClientConfigurer {

public class ElasticsearchAwsSigningInterceptorProvider implements ElasticsearchRequestInterceptorProvider {
private static final Pattern DISTRIBUTION_NAME_PATTERN = Pattern.compile( "([^\\d]+)?(?:(?<=^)|(?=$)|(?<=.):(?=.))(.+)?" );
private static final ConfigurationProperty<Boolean> SIGNING_ENABLED =
ConfigurationProperty.forKey( ElasticsearchAwsBackendSettings.SIGNING_ENABLED )
.asBoolean()
Expand Down Expand Up @@ -52,36 +54,48 @@ public class ElasticsearchAwsHttpClientConfigurer implements ElasticsearchHttpCl
.asString()
.build();

static final OptionalConfigurationProperty<String> DISTRIBUTION_NAME =
ConfigurationProperty.forKey( "version" )
.asString()
.build();

@Override
public void configure(ElasticsearchHttpClientConfigurationContext context) {
public Optional<ElasticsearchRequestInterceptor> provide(Context context) {
ConfigurationPropertySource propertySource = context.configurationPropertySource();

if ( !SIGNING_ENABLED.get( propertySource ) ) {
AwsLog.INSTANCE.signingDisabled();
return;
return Optional.empty();
}

Region region = REGION.getAndMapOrThrow( propertySource, Region::of, AwsLog.INSTANCE::missingPropertyForSigning );
String service;
switch ( context.configuredVersion().map( ElasticsearchVersion::distribution )
.orElse( ElasticsearchDistributionName.OPENSEARCH ) ) {
case AMAZON_OPENSEARCH_SERVERLESS:
service = "aoss";
break;
case ELASTIC:
case OPENSEARCH:
default:
service = "es";
break;

String distributionName = DISTRIBUTION_NAME.getAndTransform( propertySource,
v -> v.map( ver -> ver.toLowerCase( Locale.ROOT ) )
.map( DISTRIBUTION_NAME_PATTERN::matcher )
.map( matcher -> {
if ( matcher.matches() ) {
return matcher.group( 1 );
}
return null;
} ).orElse( "opensearch" ) );

if ( "amazon-opensearch-serverless".equals( distributionName ) ) {
service = "aoss";
}
else {
service = "es";
}

AwsCredentialsProvider credentialsProvider = createCredentialsProvider( context.beanResolver(), propertySource );

AwsLog.INSTANCE.signingEnabled( region, service, credentialsProvider );

AwsSigningRequestInterceptor signingInterceptor =
new AwsSigningRequestInterceptor( region, service, credentialsProvider );
ElasticsearchAwsSigningRequestInterceptor signingInterceptor =
new ElasticsearchAwsSigningRequestInterceptor( region, service, credentialsProvider );

context.clientBuilder().addInterceptorLast( signingInterceptor );
return Optional.of( signingInterceptor );
}

private AwsCredentialsProvider createCredentialsProvider(BeanResolver beanResolver,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.backend.elasticsearch.aws.impl;

import java.io.IOException;

import org.hibernate.search.backend.elasticsearch.aws.logging.impl.AwsLog;
import org.hibernate.search.backend.elasticsearch.client.common.spi.ElasticsearchRequestInterceptor;

import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.regions.Region;

class ElasticsearchAwsSigningRequestInterceptor implements ElasticsearchRequestInterceptor {

private final AwsV4HttpSigner signer;
private final Region region;
private final String service;
private final AwsCredentialsProvider credentialsProvider;

ElasticsearchAwsSigningRequestInterceptor(Region region, String service, AwsCredentialsProvider credentialsProvider) {
this.signer = AwsV4HttpSigner.create();
this.region = region;
this.service = service;
this.credentialsProvider = credentialsProvider;
}

@Override
public void intercept(RequestContext requestContext) throws IOException {
try ( HttpEntityContentStreamProvider contentStreamProvider =
HttpEntityContentStreamProvider.create( requestContext ) ) {
sign( requestContext, contentStreamProvider );
}
}

private void sign(RequestContext requestContext, HttpEntityContentStreamProvider contentStreamProvider) {
SdkHttpFullRequest awsRequest = toAwsRequest( requestContext, contentStreamProvider );

if ( AwsLog.INSTANCE.isTraceEnabled() ) {
AwsLog.INSTANCE.httpRequestBeforeSigning( requestContext );
AwsLog.INSTANCE.awsRequestBeforeSigning( awsRequest );
}

AwsCredentials credentials = credentialsProvider.resolveCredentials();
AwsLog.INSTANCE.awsCredentials( credentials );

SignedRequest signedRequest = signer.sign( r -> r.identity( credentials )
.request( awsRequest )
.payload( awsRequest.contentStreamProvider().orElse( null ) )
.putProperty( AwsV4HttpSigner.SERVICE_SIGNING_NAME, service )
.putProperty( AwsV4HttpSigner.REGION_NAME, region.id() ) );

// The AWS SDK added some headers.
// Let's just override the existing headers with whatever the AWS SDK came up with.
// We don't expect signing to affect anything else (path, query, content, ...).
requestContext.overrideHeaders( signedRequest.request().headers() );

if ( AwsLog.INSTANCE.isTraceEnabled() ) {
AwsLog.INSTANCE.httpRequestAfterSigning( signedRequest );
AwsLog.INSTANCE.awsRequestAfterSigning( requestContext );
}
}

private SdkHttpFullRequest toAwsRequest(RequestContext requestContext,
ContentStreamProvider contentStreamProvider) {
SdkHttpFullRequest.Builder awsRequestBuilder = SdkHttpFullRequest.builder();

awsRequestBuilder.host( requestContext.host() );
awsRequestBuilder.port( requestContext.port() );
awsRequestBuilder.protocol( requestContext.scheme() );

awsRequestBuilder.method( SdkHttpMethod.fromValue( requestContext.method() ) );

String path = requestContext.path();

// For some reason this is needed on Amazon OpenSearch Serverless
if ( "aoss".equals( service ) ) {
awsRequestBuilder.appendHeader( "x-amz-content-sha256", "required" );
}

awsRequestBuilder.encodedPath( path );
for ( var param : requestContext.queryParameters().entrySet() ) {
awsRequestBuilder.appendRawQueryParameter( param.getKey(), param.getValue() );
}

// Do NOT copy the headers, as the AWS SDK will sometimes sign some headers
// that are not properly taken into account by the AWS servers (e.g. content-length).

awsRequestBuilder.contentStreamProvider( contentStreamProvider );

return awsRequestBuilder.build();
}

}
Loading
Loading