Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 10 additions & 45 deletions src/main/java/org/mariadb/r2dbc/MariadbConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ public final class MariadbConnection implements org.mariadb.r2dbc.api.MariadbCon
private final Logger logger = Loggers.getLogger(this.getClass());
private final Client client;
private final MariadbConnectionConfiguration configuration;
private volatile IsolationLevel sessionIsolationLevel;
private volatile IsolationLevel isolationLevel;
private volatile String database;

public MariadbConnection(
Client client, IsolationLevel isolationLevel, MariadbConnectionConfiguration configuration) {
Client client, MariadbConnectionConfiguration configuration) {
this.client = Assert.requireNonNull(client, "client must not be null");
this.sessionIsolationLevel =
Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
this.configuration = Assert.requireNonNull(configuration, "configuration must not be null");
this.database = configuration.getDatabase();
}
Expand All @@ -48,19 +44,6 @@ public Mono<Void> beginTransaction() {
public Mono<Void> beginTransaction(TransactionDefinition definition) {
Mono<Void> request = Mono.empty();

// set isolation level for next transaction if set
IsolationLevel isoLevel = definition.getAttribute(TransactionDefinition.ISOLATION_LEVEL);
if (isoLevel != null && !isoLevel.equals(getTransactionIsolationLevel())) {
String sql = String.format("SET TRANSACTION ISOLATION LEVEL %s", isoLevel.asSql());
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
request =
client
.sendCommand(new QueryPacket(sql), true)
.handle(exceptionFactory::handleErrorResponse)
.then()
.doOnSuccess(ignore -> this.isolationLevel = isoLevel);
}

return request.then(this.client.beginTransaction(definition));
}

Expand All @@ -71,7 +54,7 @@ public Mono<Void> close() {

@Override
public Mono<Void> commitTransaction() {
return this.client.commitTransaction().doOnSuccess(i -> this.isolationLevel = null);
return this.client.commitTransaction();
}

@Override
Expand Down Expand Up @@ -105,11 +88,7 @@ public MariadbConnectionMetadata getMetadata() {

@Override
public IsolationLevel getTransactionIsolationLevel() {
if (isolationLevel != null) return isolationLevel;
if ((client.getContext().getClientCapabilities() & Capabilities.CLIENT_SESSION_TRACK) > 0
&& client.getContext().getIsolationLevel() != null)
return client.getContext().getIsolationLevel();
return this.sessionIsolationLevel;
return IsolationLevel.READ_COMMITTED;
}

@Override
Expand Down Expand Up @@ -149,7 +128,7 @@ public int getPort() {

@Override
public Mono<Void> rollbackTransaction() {
return this.client.rollbackTransaction().then().doOnSuccess(i -> this.isolationLevel = null);
return this.client.rollbackTransaction().then();
}

@Override
Expand All @@ -161,8 +140,7 @@ public Mono<Void> rollbackTransactionToSavepoint(String name) {
public Mono<Void> setAutoCommit(boolean autoCommit) {
return client
.setAutoCommit(autoCommit)
.then()
.doOnSuccess(i -> this.isolationLevel = autoCommit ? null : this.isolationLevel);
.then();
}

@Override
Expand Down Expand Up @@ -208,31 +186,18 @@ public Mono<Void> setStatementTimeout(Duration timeout) {
@Override
public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
if (isolationLevel != IsolationLevel.READ_COMMITTED) {
throw new IllegalArgumentException(
"SingleStore supports only the READ_COMMITTED isolation level");
}

if ((client.getContext().getClientCapabilities() & Capabilities.CLIENT_SESSION_TRACK) > 0
&& client.getContext().getIsolationLevel() != null
&& client.getContext().getIsolationLevel().equals(isolationLevel)) return Mono.empty();

String sql =
String.format("SET SESSION TRANSACTION ISOLATION LEVEL %s", isolationLevel.asSql());
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
final IsolationLevel newIsolation = isolationLevel;
return client
.sendCommand(new QueryPacket(sql), true)
.handle(exceptionFactory::handleErrorResponse)
.then()
.doOnSuccess(ignore -> this.sessionIsolationLevel = newIsolation);
return Mono.empty();
}

@Override
public String toString() {
return "MariadbConnection{client="
+ client
+ ", isolationLevel="
+ ((client.getContext().getClientCapabilities() | Capabilities.CLIENT_SESSION_TRACK) > 0
&& client.getContext().getIsolationLevel() != null
? client.getContext().getIsolationLevel()
: sessionIsolationLevel)
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.netty.handler.ssl.SslContextBuilder;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.IsolationLevel;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.net.URLDecoder;
Expand Down Expand Up @@ -55,7 +54,6 @@ public final class MariadbConnectionConfiguration {
private final String[] restrictedAuth;
private final LoopResources loopResources;
private final UnaryOperator<SslContextBuilder> sslContextBuilderCustomizer;
private IsolationLevel isolationLevel;
private final boolean skipPostCommands;

private MariadbConnectionConfiguration(
Expand Down Expand Up @@ -86,7 +84,6 @@ private MariadbConnectionConfiguration(
@Nullable String cachingRsaPublicKey,
boolean allowPublicKeyRetrieval,
boolean useServerPrepStmts,
IsolationLevel isolationLevel,
Boolean autocommit,
boolean permitRedirect,
boolean skipPostCommands,
Expand All @@ -103,7 +100,6 @@ private MariadbConnectionConfiguration(
this.tcpAbortiveClose = tcpAbortiveClose == null ? Boolean.FALSE : tcpAbortiveClose;
this.transactionReplay = transactionReplay == null ? Boolean.FALSE : transactionReplay;
this.database = database != null && !database.isEmpty() ? database : null;
this.isolationLevel = isolationLevel;
this.restrictedAuth = restrictedAuth != null ? restrictedAuth.split(",") : null;
if (hostAddresses != null) {
this.hostAddresses = hostAddresses;
Expand Down Expand Up @@ -362,13 +358,6 @@ public static Builder fromOptions(ConnectionFactoryOptions connectionFactoryOpti
connectionFactoryOptions.getValue(
MariadbConnectionFactoryProvider.USE_SERVER_PREPARE)));
}
if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.ISOLATION_LEVEL)) {
String isolationLvl =
(String)
connectionFactoryOptions.getValue(MariadbConnectionFactoryProvider.ISOLATION_LEVEL);
builder.isolationLevel(
isolationLvl == null ? null : IsolationLevel.valueOf(isolationLvl.replace("-", " ")));
}

if (connectionFactoryOptions.hasOption(MariadbConnectionFactoryProvider.AUTO_COMMIT)) {
Object value =
Expand Down Expand Up @@ -517,14 +506,6 @@ public static Builder builder() {
return new Builder();
}

public IsolationLevel getIsolationLevel() {
return isolationLevel;
}

private void setIsolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
}

@Nullable
public Duration getConnectTimeout() {
return this.connectTimeout;
Expand Down Expand Up @@ -766,14 +747,6 @@ public String toString() {
sb.append(field.getName()).append('=');
sb.append(obj);
}
} else if (field.getType().equals(IsolationLevel.class)) {
Object defaultValue = field.get(defaultConf);
if (!obj.equals(defaultValue)) {
sb.append(first ? '?' : '&');
first = false;
sb.append("isolationLevel=");
sb.append(((IsolationLevel) obj).asSql().replace(" ", "-"));
}
} else if (field.getType().equals(Map.class)) {
Object defaultValue = field.get(defaultConf);
if (!obj.equals(defaultValue)) {
Expand Down Expand Up @@ -840,7 +813,6 @@ public static final class Builder implements Cloneable {
private boolean allowMultiQueries = false;
private boolean allowPipelining = true;
private boolean useServerPrepStmts = false;
private IsolationLevel isolationLevel = null;
private Boolean autocommit = Boolean.TRUE;
private boolean permitRedirect = true;
private boolean skipPostCommands = false;
Expand Down Expand Up @@ -911,7 +883,6 @@ private MariadbConnectionConfiguration build(boolean checkMandatory) {
this.cachingRsaPublicKey,
this.allowPublicKeyRetrieval,
this.useServerPrepStmts,
this.isolationLevel,
this.autocommit,
this.permitRedirect,
this.skipPostCommands,
Expand Down Expand Up @@ -1184,17 +1155,6 @@ public Builder useServerPrepStmts(boolean useServerPrepStmts) {
return this;
}

/**
* Permit to set default isolation level
*
* @param isolationLevel transaction isolation level
* @return this {@link Builder}
*/
public Builder isolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
return this;
}

/**
* Permit to indicate default autocommit value. Default value True.
*
Expand Down Expand Up @@ -1227,7 +1187,6 @@ public Builder permitRedirect(boolean permitRedirect) {
* <ul>
* <li>connection exchanges to be UT8(mb3/mb4)
* <li>autocommit set to true
* <li>transaction isolation defaulting to REPEATABLE-READ
* </ul>
*
* Default value False.
Expand Down Expand Up @@ -1385,8 +1344,6 @@ public String toString() {
+ ", timezone="
+ timezone
+ ", prepareCacheSize="
+ isolationLevel
+ ", isolationLevel="
+ prepareCacheSize
+ ", tlsProtocol="
+ tlsProtocol
Expand Down
30 changes: 1 addition & 29 deletions src/main/java/org/mariadb/r2dbc/MariadbConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,7 @@ public static Mono<Void> setSessionVariables(
}

// set default transaction isolation
String txIsolation =
(client.getVersion().isMariaDBServer()
&& client.getVersion().versionGreaterOrEqual(11, 1, 1))
|| (!client.getVersion().isMariaDBServer()
&& (client.getVersion().versionGreaterOrEqual(8, 0, 3)
|| (client.getVersion().getMajorVersion() < 8
&& client.getVersion().versionGreaterOrEqual(5, 7, 20))))
? "transaction_isolation"
: "tx_isolation";
sql.append(",")
.append(txIsolation)
.append("='")
.append(
configuration.getIsolationLevel() == null
? "REPEATABLE-READ"
: configuration.getIsolationLevel().asSql().replace(" ", "-"))
.append("'");
String txIsolation = "tx_isolation";

// set session tracking
if ((client.getContext().getClientCapabilities() & Capabilities.CLIENT_SESSION_TRACK) > 0) {
Expand All @@ -196,15 +180,6 @@ public static Mono<Void> setSessionVariables(
.append(txIsolation)
.append("')))");
}

;

client
.getContext()
.setIsolationLevel(
configuration.getIsolationLevel() == null
? IsolationLevel.REPEATABLE_READ
: configuration.getIsolationLevel());
}

// set session variables if defined
Expand Down Expand Up @@ -292,9 +267,6 @@ configuration, new DomainSocketAddress(configuration.getSocket()), null, lock)
Mono.just(
new MariadbConnection(
client,
configuration.getIsolationLevel() == null
? IsolationLevel.REPEATABLE_READ
: configuration.getIsolationLevel(),
configuration))
.onErrorResume(throwable -> closeWithError(client, throwable)))
.cast(org.mariadb.r2dbc.api.MariadbConnection.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public final class MariadbConnectionFactoryProvider implements ConnectionFactory

public static final Option<Boolean> ALLOW_PIPELINING = Option.valueOf("allowPipelining");
public static final Option<Boolean> USE_SERVER_PREPARE = Option.valueOf("useServerPrepStmts");
public static final Option<String> ISOLATION_LEVEL = Option.valueOf("isolationLevel");
public static final Option<Boolean> AUTO_COMMIT = Option.valueOf("autocommit");
public static final Option<Boolean> PERMIT_REDIRECT = Option.valueOf("permitRedirect");
public static final Option<Boolean> SKIP_POST_COMMANDS = Option.valueOf("skipPostCommands");
Expand Down
76 changes: 0 additions & 76 deletions src/main/java/org/mariadb/r2dbc/MariadbTransactionDefinition.java

This file was deleted.

Loading