Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions libs/lib-mongodb/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ export function createMongoClient(config: BaseMongoConfigDecoded, options?: Mong
password: normalized.password
},
// Time for connection to timeout
connectTimeoutMS: MONGO_CONNECT_TIMEOUT_MS,
connectTimeoutMS: normalized.connectTimeoutMS ?? MONGO_CONNECT_TIMEOUT_MS,
// Time for individual requests to timeout
socketTimeoutMS: MONGO_SOCKET_TIMEOUT_MS,
socketTimeoutMS: normalized.socketTimeoutMS ?? MONGO_SOCKET_TIMEOUT_MS,
// How long to wait for new primary selection
serverSelectionTimeoutMS: 30_000,
serverSelectionTimeoutMS: normalized.serverSelectionTimeoutMS ?? 30_000,

// Identify the client
appName: options?.powersyncVersion ? `powersync-storage ${options.powersyncVersion}` : 'powersync-storage',
Expand All @@ -73,10 +73,10 @@ export function createMongoClient(config: BaseMongoConfigDecoded, options?: Mong
// Avoid too many connections:
// 1. It can overwhelm the source database.
// 2. Processing too many queries in parallel can cause the process to run out of memory.
maxPoolSize: options?.maxPoolSize ?? 8,
maxPoolSize: normalized.maxPoolSize ?? options?.maxPoolSize ?? 8,

maxConnecting: 3,
maxIdleTimeMS: 60_000
maxIdleTimeMS: normalized.maxIdleTimeMS ?? 60_000
});
}

Expand Down
34 changes: 32 additions & 2 deletions libs/lib-mongodb/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ export const BaseMongoConfig = t.object({
username: t.string.optional(),
password: t.string.optional(),

reject_ip_ranges: t.array(t.string).optional()
reject_ip_ranges: t.array(t.string).optional(),

connectTimeoutMS: t.number.optional(),
socketTimeoutMS: t.number.optional(),
serverSelectionTimeoutMS: t.number.optional(),
maxPoolSize: t.number.optional(),
maxIdleTimeMS: t.number.optional()
});

export type BaseMongoConfig = t.Encoded<typeof BaseMongoConfig>;
Expand All @@ -29,6 +35,11 @@ export type NormalizedMongoConfig = {
username: string;
password: string;
lookup: LookupFunction | undefined;
connectTimeoutMS?: number;
socketTimeoutMS?: number;
serverSelectionTimeoutMS?: number;
maxPoolSize?: number;
maxIdleTimeMS?: number;
};

/**
Expand Down Expand Up @@ -70,6 +81,19 @@ export function normalizeMongoConfig(options: BaseMongoConfigDecoded): Normalize
throw new ServiceError(ErrorCode.PSYNC_S1105, `MongoDB connection: database required`);
}

const parseQueryParam = (key: string): number | undefined => {
const value = uri.searchParams.get(key);
if (value == null) return undefined;
const num = Number(value);
if (isNaN(num) || num < 0) return undefined;
return num;
};
const connectTimeoutMS = options.connectTimeoutMS ?? parseQueryParam('connectTimeoutMS');
const socketTimeoutMS = options.socketTimeoutMS ?? parseQueryParam('socketTimeoutMS');
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? parseQueryParam('serverSelectionTimeoutMS');
const maxPoolSize = options.maxPoolSize ?? parseQueryParam('maxPoolSize');
const maxIdleTimeMS = options.maxIdleTimeMS ?? parseQueryParam('maxIdleTimeMS');

const lookupOptions: LookupOptions = {
reject_ip_ranges: options.reject_ip_ranges ?? []
};
Expand All @@ -82,6 +106,12 @@ export function normalizeMongoConfig(options: BaseMongoConfigDecoded): Normalize
username,
password,

lookup
lookup,

connectTimeoutMS,
socketTimeoutMS,
serverSelectionTimeoutMS,
maxPoolSize,
maxIdleTimeMS
};
}
46 changes: 44 additions & 2 deletions libs/lib-mongodb/test/src/config.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { describe, expect, test } from 'vitest';
import { normalizeMongoConfig } from '../../src/types/types.js';
import { LookupAddress } from 'node:dns';
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';

import { normalizeMongoConfig } from '../../src/types/types.js';

describe('config', () => {
test('Should normalize a simple URI', () => {
const uri = 'mongodb://localhost:27017/powersync_test';
Expand Down Expand Up @@ -34,6 +34,48 @@ describe('config', () => {
expect(normalized.database).equals('powersync_test');
});

test('Should parse connection parameters from URI query string', () => {
const normalized = normalizeMongoConfig({
type: 'mongodb',
uri: 'mongodb://user:pass@host/powersync_test?connectTimeoutMS=10000&socketTimeoutMS=60000&serverSelectionTimeoutMS=30000&maxPoolSize=10&maxIdleTimeMS=120000'
});
expect(normalized.connectTimeoutMS).equals(10000);
expect(normalized.socketTimeoutMS).equals(60000);
expect(normalized.serverSelectionTimeoutMS).equals(30000);
expect(normalized.maxPoolSize).equals(10);
expect(normalized.maxIdleTimeMS).equals(120000);
});

test('Should prioritize explicit config over URI query params', () => {
const normalized = normalizeMongoConfig({
type: 'mongodb',
uri: 'mongodb://user:pass@host/powersync_test?connectTimeoutMS=10000&maxPoolSize=10',
connectTimeoutMS: 20000,
maxPoolSize: 20
});
expect(normalized.connectTimeoutMS).equals(20000);
expect(normalized.maxPoolSize).equals(20);
});

test('Should handle partial query parameters', () => {
const normalized = normalizeMongoConfig({
type: 'mongodb',
uri: 'mongodb://user:pass@host/powersync_test?connectTimeoutMS=10000'
});
expect(normalized.connectTimeoutMS).equals(10000);
expect(normalized.socketTimeoutMS).toBeUndefined();
expect(normalized.maxPoolSize).toBeUndefined();
});

test('Should ignore invalid query parameter values', () => {
const normalized = normalizeMongoConfig({
type: 'mongodb',
uri: 'mongodb://user:pass@host/powersync_test?connectTimeoutMS=invalid&maxPoolSize=-5'
});
expect(normalized.connectTimeoutMS).toBeUndefined();
expect(normalized.maxPoolSize).toBeUndefined();
});

test('Should normalize a replica set URI', () => {
const uri =
'mongodb://mongodb-0.mongodb.powersync.svc.cluster.local:27017,mongodb-1.mongodb.powersync.svc.cluster.local:27017,mongodb-2.mongodb.powersync.svc.cluster.local:27017/powersync_test?replicaSet=rs0';
Expand Down
32 changes: 30 additions & 2 deletions libs/lib-postgres/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ export const BasePostgresConnectionConfig = t.object({
*/
slot_name_prefix: t.string.optional(),

max_pool_size: t.number.optional()
max_pool_size: t.number.optional(),

connect_timeout: t.number.optional(),
keepalives: t.number.optional(),
keepalives_idle: t.number.optional(),
keepalives_interval: t.number.optional(),
keepalives_count: t.number.optional()
});

export type BasePostgresConnectionConfig = t.Encoded<typeof BasePostgresConnectionConfig>;
Expand Down Expand Up @@ -83,6 +89,22 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD
const username = options.username ?? uri_username ?? '';
const password = options.password ?? uri_password ?? '';

const queryString = uri.query ?? (options.uri && options.uri.includes('?') ? options.uri.split('?')[1].split('#')[0] : '');
const queryParams = new URLSearchParams(queryString);
const parseQueryParam = (key: string): number | undefined => {
const value = queryParams.get(key);
if (value == null) return undefined;
const num = Number(value);
if (isNaN(num) || num < 0) return undefined;
return num;
};

const connect_timeout = options.connect_timeout ?? parseQueryParam('connect_timeout');
const keepalives = options.keepalives ?? parseQueryParam('keepalives');
const keepalives_idle = options.keepalives_idle ?? parseQueryParam('keepalives_idle');
const keepalives_interval = options.keepalives_interval ?? parseQueryParam('keepalives_interval');
const keepalives_count = options.keepalives_count ?? parseQueryParam('keepalives_count');

const sslmode = options.sslmode ?? 'verify-full'; // Configuration not supported via URI
const cacert = options.cacert;

Expand Down Expand Up @@ -131,7 +153,13 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD
client_certificate: options.client_certificate ?? undefined,
client_private_key: options.client_private_key ?? undefined,

max_pool_size: options.max_pool_size ?? 8
max_pool_size: options.max_pool_size ?? 8,

connect_timeout,
keepalives,
keepalives_idle,
keepalives_interval,
keepalives_count
} satisfies NormalizedBasePostgresConnectionConfig;
}

Expand Down
137 changes: 136 additions & 1 deletion libs/lib-postgres/test/src/config.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,147 @@
import { describe, expect, test } from 'vitest';

import { normalizeConnectionConfig } from '../../src/types/types.js';

describe('config', () => {
test('Should resolve database', () => {
test('Should normalize a simple URI', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://postgres:postgres@localhost:4321/powersync_test'
});
expect(normalized.database).equals('powersync_test');
expect(normalized.hostname).equals('localhost');
expect(normalized.port).equals(4321);
expect(normalized.username).equals('postgres');
expect(normalized.password).equals('postgres');
});

test('Should normalize an URI with auth', () => {
const uri = 'postgresql://user:pass@localhost:5432/powersync_test';
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri
});
expect(normalized.database).equals('powersync_test');
expect(normalized.username).equals('user');
expect(normalized.password).equals('pass');
});

test('Should normalize an URI with query params', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@host/db?other=param'
});
expect(normalized.database).equals('db');
});

test('Should prioritize username and password that are specified explicitly', () => {
const uri = 'postgresql://user:pass@localhost:5432/powersync_test';
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri,
username: 'user2',
password: 'pass2'
});
expect(normalized.username).equals('user2');
expect(normalized.password).equals('pass2');
});

test('Should parse connection parameters from URI query string', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@host/db?connect_timeout=300&keepalives=1&keepalives_idle=60&keepalives_interval=10&keepalives_count=10'
});
expect(normalized.connect_timeout).equals(300);
expect(normalized.keepalives).equals(1);
expect(normalized.keepalives_idle).equals(60);
expect(normalized.keepalives_interval).equals(10);
expect(normalized.keepalives_count).equals(10);
});

test('Should prioritize explicit config over URI query params', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@host/db?connect_timeout=300&keepalives_idle=60',
connect_timeout: 600,
keepalives_idle: 120
});
expect(normalized.connect_timeout).equals(600);
expect(normalized.keepalives_idle).equals(120);
});

test('Should handle partial query parameters', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@host/db?connect_timeout=300'
});
expect(normalized.connect_timeout).equals(300);
expect(normalized.keepalives).toBeUndefined();
expect(normalized.keepalives_idle).toBeUndefined();
});

test('Should ignore invalid query parameter values', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@host/db?connect_timeout=invalid&keepalives_idle=-5'
});
expect(normalized.connect_timeout).toBeUndefined();
expect(normalized.keepalives_idle).toBeUndefined();
});

test('Should handle keepalives=0 to disable keepalives', () => {
const normalized = normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@host/db?keepalives=0'
});
expect(normalized.keepalives).equals(0);
});

describe('errors', () => {
test('Should throw error when no database specified', () => {
['postgresql://user:pass@localhost:5432', 'postgresql://user:pass@localhost:5432/'].forEach((uri) => {
expect(() =>
normalizeConnectionConfig({
type: 'postgresql',
uri
})
).toThrow('[PSYNC_S1105] Postgres connection: database required');
});
});

test('Should throw error when URI has invalid scheme', () => {
expect(() =>
normalizeConnectionConfig({
type: 'postgresql',
uri: 'http://user:pass@localhost:5432/powersync_test'
})
).toThrow('[PSYNC_S1109] Invalid URI - protocol must be postgresql');
});

test('Should throw error when hostname is missing', () => {
expect(() =>
normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user:pass@/powersync_test'
})
).toThrow('[PSYNC_S1106] Postgres connection: hostname required');
});

test('Should throw error when username is missing', () => {
expect(() =>
normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://localhost:5432/powersync_test'
})
).toThrow('[PSYNC_S1107] Postgres connection: username required');
});

test('Should throw error when password is missing', () => {
expect(() =>
normalizeConnectionConfig({
type: 'postgresql',
uri: 'postgresql://user@localhost:5432/powersync_test'
})
).toThrow('[PSYNC_S1108] Postgres connection: password required');
});
});
});
10 changes: 5 additions & 5 deletions modules/module-mongodb/src/replication/MongoManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ export class MongoManager extends BaseObserver<MongoManagerListener> {

lookup: options.lookup,
// Time for connection to timeout
connectTimeoutMS: 5_000,
connectTimeoutMS: options.connectTimeoutMS ?? 5_000,
// Time for individual requests to timeout
socketTimeoutMS: 60_000,
socketTimeoutMS: options.socketTimeoutMS ?? 60_000,
// How long to wait for new primary selection
serverSelectionTimeoutMS: 30_000,
serverSelectionTimeoutMS: options.serverSelectionTimeoutMS ?? 30_000,

// Identify the client
appName: `powersync ${POWERSYNC_VERSION}`,
Expand All @@ -47,10 +47,10 @@ export class MongoManager extends BaseObserver<MongoManagerListener> {
// Avoid too many connections:
// 1. It can overwhelm the source database.
// 2. Processing too many queries in parallel can cause the process to run out of memory.
maxPoolSize: 8,
maxPoolSize: options.maxPoolSize ?? 8,

maxConnecting: 3,
maxIdleTimeMS: 60_000,
maxIdleTimeMS: options.maxIdleTimeMS ?? 60_000,

...BSON_DESERIALIZE_DATA_OPTIONS,

Expand Down
6 changes: 6 additions & 0 deletions modules/module-mongodb/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ export interface NormalizedMongoConnectionConfig {
lookup?: LookupFunction;

postImages: PostImagesOption;

connectTimeoutMS?: number;
socketTimeoutMS?: number;
serverSelectionTimeoutMS?: number;
maxPoolSize?: number;
maxIdleTimeMS?: number;
}

export const MongoConnectionConfig = lib_mongo.BaseMongoConfig.and(service_types.configFile.DataSourceConfig).and(
Expand Down
Loading