Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 310 additions & 52 deletions glide-core/src/socket_listener.rs

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions node/rust-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use bytes::Bytes;
use glide_core::MAX_REQUEST_ARGS_LENGTH;
use glide_core::client::ConnectionError;
use glide_core::client::get_or_init_runtime;
use glide_core::release_socket_listener;
use glide_core::start_socket_listener;
use napi::bindgen_prelude::BigInt;
use napi::bindgen_prelude::Either;
Expand Down Expand Up @@ -195,6 +196,11 @@ pub fn start_socket_listener_external(env: Env) -> Result<JsObject> {
Ok(promise)
}

#[napi(js_name = "ReleaseSocketConnection")]
pub fn release_socket_listener_external(path: String) {
release_socket_listener(&path);
}

#[napi(js_name = "InitOpenTelemetry")]
pub fn init_open_telemetry(open_telemetry_config: OpenTelemetryConfig) -> Result<()> {
// At least one of traces or metrics must be provided
Expand Down
70 changes: 66 additions & 4 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import {
RangeByIndex,
RangeByLex,
RangeByScore,
ReleaseSocketConnection,
RequestError,
RestoreOptions,
RouteOption,
Expand Down Expand Up @@ -978,6 +979,8 @@ type WritePromiseOptions =
*/
export class BaseClient {
private socket: net.Socket;
private socketPath?: string;
private socketListenerReleased = false;
protected readonly promiseCallbackFunctions:
| [PromiseFunction, ErrorFunction, Decoder | undefined][]
| [PromiseFunction, ErrorFunction][] = [];
Expand Down Expand Up @@ -1259,6 +1262,32 @@ export class BaseClient {
options?.inflightRequestsLimit ?? DEFAULT_INFLIGHT_REQUESTS_LIMIT;
}

protected setSocketPath(path: string): void {
this.socketPath = path;
this.socketListenerReleased = false;
}

protected releaseSocketListener(): void {
if (this.socketListenerReleased || !this.socketPath) {
return;
}

try {
ReleaseSocketConnection(this.socketPath);
} catch (error) {
// Log the error but don't throw - we still need to clear local state
Logger.log(
"warn",
"Socket cleanup",
`Error releasing socket connection: ${error}`,
);
} finally {
// Always clear local state to ensure idempotency and prevent inconsistent state
this.socketListenerReleased = true;
this.socketPath = undefined;
}
}

protected getCallbackIndex(): number {
return (
this.availableCallbackSlots.pop() ??
Expand Down Expand Up @@ -9165,6 +9194,16 @@ export class BaseClient {
});
Logger.log("info", "Client lifetime", "disposing of client");
this.socket.end();

try {
this.releaseSocketListener();
} catch (error) {
Logger.log(
"debug",
"Client lifetime",
`Error releasing socket listener during close: ${error}`,
);
}
}

/**
Expand Down Expand Up @@ -9210,17 +9249,40 @@ export class BaseClient {
) => TConnection,
): Promise<TConnection> {
const path = await StartSocketConnection();
const socket = await this.GetSocket(path);
let socket: net.Socket;

try {
return await this.__createClientInternal<TConnection>(
socket = await this.GetSocket(path);
} catch (err) {
ReleaseSocketConnection(path);
throw err;
}

try {
const connection = await this.__createClientInternal<TConnection>(
options,
socket,
constructor,
);
connection.setSocketPath(path);
return connection;
} catch (err) {
// Ensure socket is closed
socket.end();
// Ensure socket is fully closed and listener is released
try {
socket.end();
socket.destroy();
} catch {
// Ignore socket cleanup errors to avoid masking the original error
}

try {
if (ReleaseSocketConnection) {
ReleaseSocketConnection(path);
}
} catch {
// Ignore release errors to avoid masking the original error
}

throw err;
}
}
Expand Down
11 changes: 7 additions & 4 deletions node/tests/AuthTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,14 @@ describe("Auth tests", () => {
);
});

// Skip IAM Auth tests in CI/CD environments
// Skip IAM Auth tests unless explicitly enabled (and never run them in CI by default)
const describeIamTests =
process.env.CI || process.env.GITHUB_ACTIONS || process.env.JENKINS_URL
? describe.skip
: describe;
process.env.ENABLE_IAM_AUTH_TESTS === "true" &&
!process.env.CI &&
!process.env.GITHUB_ACTIONS &&
!process.env.JENKINS_URL
? describe
: describe.skip;

describeIamTests("IAM Auth: Elasticache Cluster", () => {
it("test_iam_authentication_elasticache_cluster", async () => {
Expand Down
27 changes: 17 additions & 10 deletions node/tests/OpenTelemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/

import { afterAll, afterEach, beforeAll, describe } from "@jest/globals";
import {
afterAll,
afterEach,
beforeAll,
describe,
expect,
it,
} from "@jest/globals";
import * as fs from "fs";
import ValkeyCluster from "../../utils/TestUtils";
import {
Expand Down Expand Up @@ -237,7 +244,7 @@ describe("OpenTelemetry GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test span memory leak_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
if (global.gc) {
global.gc(); // Run garbage collection
}
Expand Down Expand Up @@ -272,7 +279,7 @@ describe("OpenTelemetry GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test percentage requests config_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
const client = await GlideClusterClient.createClient({
...getClientConfigurationOption(
cluster.getAddresses(),
Expand Down Expand Up @@ -327,7 +334,7 @@ describe("OpenTelemetry GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test otel global config not reinitialize_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
const openTelemetryConfig: OpenTelemetryConfig = {
traces: {
endpoint: "wrong.endpoint",
Expand Down Expand Up @@ -361,7 +368,7 @@ describe("OpenTelemetry GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test span transaction memory leak_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
if (global.gc) {
global.gc(); // Run garbage collection
}
Expand Down Expand Up @@ -402,7 +409,7 @@ describe("OpenTelemetry GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test number of clients with same config_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
const client1 = await GlideClusterClient.createClient({
...getClientConfigurationOption(
cluster.getAddresses(),
Expand Down Expand Up @@ -437,7 +444,7 @@ describe("OpenTelemetry GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test span batch file_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
if (global.gc) {
global.gc(); // Run garbage collection
}
Expand Down Expand Up @@ -522,7 +529,7 @@ describe("OpenTelemetry GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient test automatic span lifecycle_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
if (global.gc) {
global.gc(); // Run garbage collection
}
Expand Down Expand Up @@ -555,7 +562,7 @@ describe("OpenTelemetry GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"GlideClient test otel global config not reinitialize_%p",
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient({
...getClientConfigurationOption(
cluster.getAddresses(),
Expand All @@ -576,7 +583,7 @@ describe("OpenTelemetry GlideClient", () => {

it.each([ProtocolVersion.RESP3, ProtocolVersion.RESP2])(
`GlideClient test concurrent commands span lifecycle_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
if (global.gc) {
global.gc(); // Run garbage collection
}
Expand Down
35 changes: 21 additions & 14 deletions node/tests/ScanTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/

import { afterAll, afterEach, beforeAll, describe } from "@jest/globals";
import {
afterAll,
afterEach,
beforeAll,
describe,
expect,
it,
} from "@jest/globals";
import { ValkeyCluster } from "../../utils/TestUtils.js";
import {
ClusterScanCursor,
Expand Down Expand Up @@ -57,7 +64,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient test basic cluster scan_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClusterClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -128,7 +135,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient simple scan with encoding %p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClusterClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -162,7 +169,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with object type and pattern%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClusterClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -210,7 +217,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with count%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClusterClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -246,7 +253,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with match%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClusterClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -294,7 +301,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with different types%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClusterClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -381,7 +388,7 @@ describe("Scan GlideClusterClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with allowNonCoveredSlots %p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
const testCluster = await ValkeyCluster.createCluster(
true,
3,
Expand Down Expand Up @@ -573,7 +580,7 @@ describe("Scan GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient test basic scan_%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -645,7 +652,7 @@ describe("Scan GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient simple scan with encoding %p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -679,7 +686,7 @@ describe("Scan GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient scan with object type and pattern%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -727,7 +734,7 @@ describe("Scan GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient scan with count%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -763,7 +770,7 @@ describe("Scan GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient scan with match%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down Expand Up @@ -811,7 +818,7 @@ describe("Scan GlideClient", () => {

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClient scan with different types%p`,
async (protocol) => {
async (protocol: ProtocolVersion) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);
Expand Down
Loading
Loading