Skip to content

Add configuration options to control table creation restrictions #1503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ public class ConfigOptions {
"The interval of auto partition check. "
+ "The default value is 10 minutes.");

public static final ConfigOption<Boolean> LOG_TABLE_ALLOW_CREATION =
key("allow.create.log.tables")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to allow creation of log tables. When set to false, "
+ "attempts to create log tables (tables without primary key) will be rejected. "
+ "The default value is true.");

public static final ConfigOption<Boolean> KV_TABLE_ALLOW_CREATION =
key("allow.create.kv.tables")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to allow creation of kv tables (primary key tables). When set to false, "
+ "attempts to create kv tables (tables with primary key) will be rejected. "
+ "The default value is true.");

public static final ConfigOption<Integer> MAX_PARTITION_NUM =
key("max.partition.num")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina

private final int defaultBucketNumber;
private final int defaultReplicationFactor;
private final boolean logTableAllowCreation;
private final boolean kvTableAllowCreation;
private final Supplier<EventManager> eventManagerSupplier;
private final Supplier<Integer> coordinatorEpochSupplier;
private final ServerMetadataCache metadataCache;
Expand All @@ -156,6 +158,8 @@ public CoordinatorService(
super(remoteFileSystem, ServerType.COORDINATOR, zkClient, metadataManager, authorizer);
this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
this.logTableAllowCreation = conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);
this.kvTableAllowCreation = conf.getBoolean(ConfigOptions.KV_TABLE_ALLOW_CREATION);
this.eventManagerSupplier =
() -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager();
this.coordinatorEpochSupplier =
Expand Down Expand Up @@ -244,6 +248,9 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
}
}

// Check table creation permissions based on table type
validateTableCreationPermission(tableDescriptor, tablePath);

// apply system defaults if the config is not set
tableDescriptor = applySystemDefaults(tableDescriptor);

Expand Down Expand Up @@ -650,4 +657,30 @@ private static List<BucketMetadata> getBucketMetadataFromContext(
});
return bucketMetadataList;
}

/**
* Validates whether the table creation is allowed based on the table type and configuration.
*
* @param tableDescriptor the table descriptor to validate
* @param tablePath the table path for error reporting
* @throws InvalidTableException if table creation is not allowed
*/
private void validateTableCreationPermission(
TableDescriptor tableDescriptor, TablePath tablePath) {
boolean hasPrimaryKey = tableDescriptor.hasPrimaryKey();

if (hasPrimaryKey) {
// This is a KV table (Primary Key Table)
if (!kvTableAllowCreation) {
throw new InvalidTableException(
"Creation of Primary Key Tables is disallowed in the cluster.");
}
} else {
// This is a Log table
if (!logTableAllowCreation) {
throw new InvalidTableException(
"Creation of Log Tables is disallowed in the cluster.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,58 @@ class TableManagerITCase {
.setClusterConf(initConf())
.build();

// Extension for testing log table creation restriction
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_LOG_RESTRICTED_EXTENSION =
FlussClusterExtension.builder()
.setNumOfTabletServers(3)
.setCoordinatorServerListeners(
String.format(
"%s://localhost:0, %s://localhost:0",
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
.setTabletServerListeners(
String.format(
"%s://localhost:0, %s://localhost:0",
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
.setClusterConf(initLogRestrictedConf())
.build();

// Extension for testing kv table creation restriction
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_KV_RESTRICTED_EXTENSION =
FlussClusterExtension.builder()
.setNumOfTabletServers(3)
.setCoordinatorServerListeners(
String.format(
"%s://localhost:0, %s://localhost:0",
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
.setTabletServerListeners(
String.format(
"%s://localhost:0, %s://localhost:0",
DEFAULT_LISTENER_NAME, CLIENT_LISTENER))
.setClusterConf(initKvRestrictedConf())
.build();

private static Configuration initConf() {
Configuration conf = new Configuration();
conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL, Duration.ofSeconds(1));
return conf;
}

private static Configuration initLogRestrictedConf() {
Configuration conf = new Configuration();
conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL, Duration.ofSeconds(1));
conf.set(ConfigOptions.LOG_TABLE_ALLOW_CREATION, false);
return conf;
}

private static Configuration initKvRestrictedConf() {
Configuration conf = new Configuration();
conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL, Duration.ofSeconds(1));
conf.set(ConfigOptions.KV_TABLE_ALLOW_CREATION, false);
return conf;
}

@BeforeEach
void setup() {
zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
Expand All @@ -157,7 +203,7 @@ void testCreateInvalidDatabaseAndTable() {
.createTable(
newCreateTableRequest(
new TablePath("db", "=invalid_table!"),
newTable(),
newPkTable(),
true))
.get())
.cause()
Expand All @@ -170,7 +216,7 @@ void testCreateInvalidDatabaseAndTable() {
.createTable(
newCreateTableRequest(
new TablePath("", "=invalid_table!"),
newTable(),
newPkTable(),
true))
.get())
.cause()
Expand Down Expand Up @@ -272,7 +318,7 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception {
adminGateway.dropTable(newDropTableRequest(db1, tb1, true)).get();

// then create a table
TableDescriptor tableDescriptor = newTable();
TableDescriptor tableDescriptor = newPkTable();
adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get();

// the table should exist then
Expand Down Expand Up @@ -453,7 +499,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception {
TablePath tablePath = TablePath.of(db1, tb1);
// first create a database
adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get();
TableDescriptor tableDescriptor = newTable();
TableDescriptor tableDescriptor = newPkTable();
adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, false)).get();
GetTableInfoResponse response =
gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
Expand Down Expand Up @@ -715,7 +761,7 @@ private static void checkAssignmentWithReplicaFactor(
}

private static TableDescriptor newTableWithoutSettingDistribution() {
return TableDescriptor.builder().schema(newSchema()).comment("first table").build();
return TableDescriptor.builder().schema(newPkSchema()).comment("first table").build();
}

private static TableDescriptor newPartitionedTable() {
Expand Down Expand Up @@ -749,15 +795,15 @@ private static TableDescriptor.Builder newPartitionedTableBuilder(
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 1);
}

private static TableDescriptor newTable() {
private static TableDescriptor newPkTable() {
return TableDescriptor.builder()
.schema(newSchema())
.schema(newPkSchema())
.comment("first table")
.distributedBy(3, "a")
.build();
}

private static Schema newSchema() {
private static Schema newPkSchema() {
return Schema.newBuilder()
.column("a", DataTypes.INT())
.withComment("a comment")
Expand Down Expand Up @@ -798,4 +844,118 @@ private UpdateMetadataRequest makeLegacyUpdateMetadataRequest(
});
return updateMetadataRequest;
}

// Test methods for table creation restrictions

@Test
void testLogTableCreationRestriction() throws Exception {
// Test with cluster that disallows log table creation
ZooKeeperClient logRestrictedZkClient =
FLUSS_CLUSTER_LOG_RESTRICTED_EXTENSION.getZooKeeperClient();
Configuration logRestrictedClientConf =
FLUSS_CLUSTER_LOG_RESTRICTED_EXTENSION.getClientConfig();
AdminGateway logRestrictedAdminGateway =
FLUSS_CLUSTER_LOG_RESTRICTED_EXTENSION.newCoordinatorClient();

String db1 = "db1";
String tb1 = "log_table";
TablePath tablePath = TablePath.of(db1, tb1);

// Create database first
logRestrictedAdminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get();

// Try to create a log table (table without primary key), should fail
TableDescriptor logTableDescriptor = newLogTable();
assertThatThrownBy(
() ->
logRestrictedAdminGateway
.createTable(
newCreateTableRequest(
tablePath, logTableDescriptor, false))
.get())
.cause()
.isInstanceOf(InvalidTableException.class)
.hasMessageContaining("Creation of Log Tables is disallowed in the cluster.");

// Try to create a kv table (table with primary key), should succeed
String tb2 = "kv_table";
TablePath kvTablePath = TablePath.of(db1, tb2);
TableDescriptor kvTableDescriptor = newPkTable();
logRestrictedAdminGateway
.createTable(newCreateTableRequest(kvTablePath, kvTableDescriptor, false))
.get();

// Verify the kv table was created successfully
assertThat(
logRestrictedAdminGateway
.tableExists(newTableExistsRequest(kvTablePath))
.get()
.isExists())
.isTrue();
}

@Test
void testKvTableCreationRestriction() throws Exception {
// Test with cluster that disallows kv table creation
ZooKeeperClient kvRestrictedZkClient =
FLUSS_CLUSTER_KV_RESTRICTED_EXTENSION.getZooKeeperClient();
Configuration kvRestrictedClientConf =
FLUSS_CLUSTER_KV_RESTRICTED_EXTENSION.getClientConfig();
AdminGateway kvRestrictedAdminGateway =
FLUSS_CLUSTER_KV_RESTRICTED_EXTENSION.newCoordinatorClient();

String db1 = "db1";
String tb1 = "kv_table";
TablePath tablePath = TablePath.of(db1, tb1);

// Create database first
kvRestrictedAdminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get();

// Try to create a kv table (table with primary key), should fail
TableDescriptor kvTableDescriptor = newPkTable();
assertThatThrownBy(
() ->
kvRestrictedAdminGateway
.createTable(
newCreateTableRequest(
tablePath, kvTableDescriptor, false))
.get())
.cause()
.isInstanceOf(InvalidTableException.class)
.hasMessageContaining(
"Creation of Primary Key Tables is disallowed in the cluster.");

// Try to create a log table (table without primary key), should succeed
String tb2 = "log_table";
TablePath logTablePath = TablePath.of(db1, tb2);
TableDescriptor logTableDescriptor = newLogTable();
kvRestrictedAdminGateway
.createTable(newCreateTableRequest(logTablePath, logTableDescriptor, false))
.get();

// Verify the log table was created successfully
assertThat(
kvRestrictedAdminGateway
.tableExists(newTableExistsRequest(logTablePath))
.get()
.isExists())
.isTrue();
}

// Helper methods for creating different table types
private static TableDescriptor newLogTable() {
return TableDescriptor.builder()
.schema(newLogSchema())
.comment("log table without primary key")
.distributedBy(3, "a")
.build();
}

private static Schema newLogSchema() {
return Schema.newBuilder()
.column("a", DataTypes.INT())
.withComment("a comment")
.column("b", DataTypes.STRING())
.build();
}
}
Loading