From ff46a3e07f747c90874a28d52c957704000d88bc Mon Sep 17 00:00:00 2001 From: Gero Gerke Date: Thu, 21 Aug 2025 17:05:54 +0200 Subject: [PATCH] parametrize influxdb client version --- benches/client.rs | 5 +- influxdb/src/client/mod.rs | 32 ++++-- .../src/integrations/serde_integration/mod.rs | 2 +- influxdb/src/lib.rs | 2 +- influxdb/tests/derive_integration_tests.rs | 2 +- ...ation_tests.rs => integration_tests_v1.rs} | 100 +++++++++++------- influxdb/tests/integration_tests_v2.rs | 16 ++- influxdb/tests/utilities.rs | 45 ++++++-- 8 files changed, 143 insertions(+), 61 deletions(-) rename influxdb/tests/{integration_tests.rs => integration_tests_v1.rs} (88%) diff --git a/benches/client.rs b/benches/client.rs index e07bd1b..a83460d 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use influxdb::Error; use influxdb::InfluxDbWriteable; +use influxdb::InfluxVersion1; use influxdb::{Client, ReadQuery}; use std::sync::Arc; use std::time::Instant; @@ -22,7 +23,7 @@ async fn main() { let number_of_total_requests = 20000; let concurrent_requests = 1000; - let client = Client::new(url, db_name); + let client: Client = Client::new(url, db_name); let concurrency_limit = Arc::new(Semaphore::new(concurrent_requests)); prepare_influxdb(&client, db_name).await; @@ -64,7 +65,7 @@ async fn main() { ); } -async fn prepare_influxdb(client: &Client, db_name: &str) { +async fn prepare_influxdb(client: &Client, db_name: &str) { let create_db_stmt = format!("CREATE DATABASE {}", db_name); client .query(&ReadQuery::new(create_db_stmt)) diff --git a/influxdb/src/client/mod.rs b/influxdb/src/client/mod.rs index b238a84..d776d18 100644 --- a/influxdb/src/client/mod.rs +++ b/influxdb/src/client/mod.rs @@ -23,18 +23,30 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; #[cfg(feature = "surf")] use surf::{Client as HttpClient, RequestBuilder, Response as HttpResponse}; +use std::marker::PhantomData; use crate::query::QueryType; use crate::Error; use crate::Query; +/// Marker type for InfluxDB Version 1 +#[derive(Clone)] +pub struct InfluxVersion1; +/// Marker type for InfluxDB Version 2 +#[derive(Clone)] +pub struct InfluxVersion2; +/// Marker type for InfluxDB Version 3 +#[derive(Clone)] +pub struct InfluxVersion3; + #[derive(Clone)] /// Internal Representation of a Client -pub struct Client { +pub struct Client { pub(crate) url: Arc, pub(crate) parameters: Arc>, pub(crate) token: Option, - pub(crate) client: HttpClient, + pub(crate) client: H, + _version: PhantomData, } struct RedactPassword<'a>(&'a HashMap<&'static str, String>); @@ -53,7 +65,7 @@ impl<'a> Debug for RedactPassword<'a> { } } -impl Debug for Client { +impl Debug for Client { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Client") .field("url", &self.url) @@ -62,7 +74,7 @@ impl Debug for Client { } } -impl Client { +impl Client { /// Instantiates a new [`Client`](crate::Client) /// /// # Arguments @@ -90,6 +102,7 @@ impl Client { parameters: Arc::new(parameters), client: HttpClient::new(), token: None, + _version: PhantomData, } } @@ -308,12 +321,15 @@ pub(crate) fn check_status(res: &HttpResponse) -> Result<(), Error> { #[cfg(test)] mod tests { + + use crate::client::InfluxVersion1; + use super::Client; use indoc::indoc; #[test] fn test_client_debug_redacted_password() { - let client = Client::new("https://localhost:8086", "db").with_auth("user", "pass"); + let client: Client = Client::new("https://localhost:8086", "db").with_auth("user", "pass"); let actual = format!("{client:#?}"); let expected = indoc! { r#" Client { @@ -331,14 +347,14 @@ mod tests { #[test] fn test_fn_database() { - let client = Client::new("http://localhost:8068", "database"); + let client: Client = Client::new("http://localhost:8068", "database"); assert_eq!(client.database_name(), "database"); assert_eq!(client.database_url(), "http://localhost:8068"); } #[test] fn test_with_auth() { - let client = Client::new("http://localhost:8068", "database"); + let client: Client = Client::new("http://localhost:8068", "database"); assert_eq!(client.parameters.len(), 1); assert_eq!(client.parameters.get("db").unwrap(), "database"); @@ -348,7 +364,7 @@ mod tests { assert_eq!(with_auth.parameters.get("u").unwrap(), "username"); assert_eq!(with_auth.parameters.get("p").unwrap(), "password"); - let client = Client::new("http://localhost:8068", "database"); + let client: Client = Client::new("http://localhost:8068", "database"); let with_auth = client.with_token("token"); assert_eq!(with_auth.parameters.len(), 1); assert_eq!(with_auth.parameters.get("db").unwrap(), "database"); diff --git a/influxdb/src/integrations/serde_integration/mod.rs b/influxdb/src/integrations/serde_integration/mod.rs index a6373f1..0167a9a 100644 --- a/influxdb/src/integrations/serde_integration/mod.rs +++ b/influxdb/src/integrations/serde_integration/mod.rs @@ -120,7 +120,7 @@ pub struct TaggedSeries { pub values: Vec, } -impl Client { +impl Client { pub async fn json_query(&self, q: ReadQuery) -> Result { let query = q.build().map_err(|err| Error::InvalidQueryError { error: err.to_string(), diff --git a/influxdb/src/lib.rs b/influxdb/src/lib.rs index 16303bd..3aec703 100644 --- a/influxdb/src/lib.rs +++ b/influxdb/src/lib.rs @@ -138,7 +138,7 @@ mod client; mod error; mod query; -pub use client::Client; +pub use client::{Client, InfluxVersion1, InfluxVersion2, InfluxVersion3}; pub use error::Error; pub use query::{ read_query::ReadQuery, diff --git a/influxdb/tests/derive_integration_tests.rs b/influxdb/tests/derive_integration_tests.rs index 6ecf612..eff6f81 100644 --- a/influxdb/tests/derive_integration_tests.rs +++ b/influxdb/tests/derive_integration_tests.rs @@ -10,7 +10,7 @@ use influxdb::{Query, ReadQuery, Timestamp}; #[cfg(feature = "serde")] use serde_derive::Deserialize; -use utilities::{assert_result_ok, create_client, create_db, delete_db, run_test}; +use utilities::{assert_result_ok, run_test}; #[derive(Debug, PartialEq)] #[cfg_attr(feature = "derive", derive(InfluxDbWriteable))] diff --git a/influxdb/tests/integration_tests.rs b/influxdb/tests/integration_tests_v1.rs similarity index 88% rename from influxdb/tests/integration_tests.rs rename to influxdb/tests/integration_tests_v1.rs index 289ac9c..4f53ad9 100644 --- a/influxdb/tests/integration_tests.rs +++ b/influxdb/tests/integration_tests_v1.rs @@ -5,7 +5,7 @@ mod utilities; use serde_derive::Deserialize; use utilities::{ - assert_result_err, assert_result_ok, create_client, create_db, delete_db, run_test, + assert_result_err, assert_result_ok, run_test, create_client_v1, create_db_v1, delete_db_v1 }; use influxdb::InfluxDbWriteable; @@ -17,7 +17,9 @@ use influxdb::{Client, Error, ReadQuery, Timestamp}; #[async_std::test] #[cfg(not(tarpaulin_include))] async fn test_ping_influx_db_async_std() { - let client = create_client("notusedhere"); + use influxdb::InfluxVersion1; + + let client: Client = create_client_v1("notusedhere"); let result = client.ping().await; assert_result_ok(&result); @@ -34,7 +36,7 @@ async fn test_ping_influx_db_async_std() { #[tokio::test] #[cfg(not(any(tarpaulin_include, feature = "hyper-client")))] async fn test_ping_influx_db_tokio() { - let client = create_client("notusedhere"); + let client = create_client_v1("notusedhere"); let result = client.ping().await; assert_result_ok(&result); @@ -51,8 +53,10 @@ async fn test_ping_influx_db_tokio() { #[async_std::test] #[cfg(not(tarpaulin_include))] async fn test_connection_error() { + use influxdb::InfluxVersion1; + let test_name = "test_connection_error"; - let client = + let client: Client = Client::new("http://127.0.0.1:10086", test_name).with_auth("nopriv_user", "password"); let read_query = ReadQuery::new("SELECT * FROM weather"); let read_result = client.query(read_query).await; @@ -76,7 +80,9 @@ async fn test_authed_write_and_read() { run_test( || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("CREATE DATABASE {TEST_NAME}"); client @@ -84,7 +90,7 @@ async fn test_authed_write_and_read() { .await .expect("could not setup db"); - let client = + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let write_query = Timestamp::Hours(11) .into_query("weather") @@ -101,7 +107,9 @@ async fn test_authed_write_and_read() { ); }, || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("DROP DATABASE {TEST_NAME}"); @@ -126,7 +134,9 @@ async fn test_wrong_authed_write_and_read() { run_test( || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("CREATE DATABASE {TEST_NAME}"); client @@ -134,7 +144,7 @@ async fn test_wrong_authed_write_and_read() { .await .expect("could not setup db"); - let client = + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("wrong_user", "password"); let write_query = Timestamp::Hours(11) .into_query("weather") @@ -160,7 +170,7 @@ async fn test_wrong_authed_write_and_read() { ), } - let client = Client::new("http://127.0.0.1:9086", TEST_NAME) + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME) .with_auth("nopriv_user", "password"); let read_query = ReadQuery::new("SELECT * FROM weather"); let read_result = client.query(read_query).await; @@ -174,7 +184,9 @@ async fn test_wrong_authed_write_and_read() { } }, || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("DROP DATABASE {TEST_NAME}"); client @@ -198,14 +210,16 @@ async fn test_non_authed_write_and_read() { run_test( || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("CREATE DATABASE {TEST_NAME}"); client .query(ReadQuery::new(query)) .await .expect("could not setup db"); - let non_authed_client = Client::new("http://127.0.0.1:9086", TEST_NAME); + let non_authed_client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME); let write_query = Timestamp::Hours(11) .into_query("weather") .add_field("temperature", 82); @@ -232,7 +246,9 @@ async fn test_non_authed_write_and_read() { } }, || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("DROP DATABASE {TEST_NAME}"); client @@ -254,8 +270,8 @@ async fn test_write_and_read_field() { run_test( || async move { - create_db(TEST_NAME).await.expect("could not setup db"); - let client = create_client(TEST_NAME); + create_db_v1(TEST_NAME).await.expect("could not setup db"); + let client = create_client_v1(TEST_NAME); let write_query = Timestamp::Hours(11) .into_query("weather") .add_field("temperature", 82); @@ -271,7 +287,7 @@ async fn test_write_and_read_field() { ); }, || async move { - delete_db(TEST_NAME).await.expect("could not clean up db"); + delete_db_v1(TEST_NAME).await.expect("could not clean up db"); }, ) .await; @@ -290,14 +306,16 @@ async fn test_json_non_authed_read() { run_test( || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("CREATE DATABASE {TEST_NAME}"); client .query(ReadQuery::new(query)) .await .expect("could not setup db"); - let non_authed_client = Client::new("http://127.0.0.1:9086", TEST_NAME); + let non_authed_client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME); let read_query = ReadQuery::new("SELECT * FROM weather"); let read_result = non_authed_client.json_query(read_query).await; @@ -311,7 +329,9 @@ async fn test_json_non_authed_read() { } }, || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("DROP DATABASE {TEST_NAME}"); @@ -335,7 +355,9 @@ async fn test_json_authed_read() { run_test( || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("CREATE DATABASE {TEST_NAME}"); client @@ -348,7 +370,9 @@ async fn test_json_authed_read() { assert_result_ok(&read_result); }, || async move { - let client = + use influxdb::InfluxVersion1; + + let client: Client = Client::new("http://127.0.0.1:9086", TEST_NAME).with_auth("admin", "password"); let query = format!("DROP DATABASE {TEST_NAME}"); @@ -373,9 +397,9 @@ async fn test_write_and_read_option() { run_test( || { async move { - create_db(TEST_NAME).await.expect("could not setup db"); + create_db_v1(TEST_NAME).await.expect("could not setup db"); - let client = create_client(TEST_NAME); + let client = create_client_v1(TEST_NAME); // Todo: Convert this to derive based insert for easier comparison of structs let write_query = Timestamp::Hours(11) .into_query("weather") @@ -411,7 +435,7 @@ async fn test_write_and_read_option() { } }, || async move { - delete_db("test_write_and_read_option") + delete_db_v1("test_write_and_read_option") .await .expect("could not clean up db"); }, @@ -431,9 +455,9 @@ async fn test_json_query() { run_test( || async move { - create_db(TEST_NAME).await.expect("could not setup db"); + create_db_v1(TEST_NAME).await.expect("could not setup db"); - let client = create_client(TEST_NAME); + let client = create_client_v1(TEST_NAME); let write_query = Timestamp::Hours(11) .into_query("weather") @@ -463,7 +487,7 @@ async fn test_json_query() { ); }, || async move { - delete_db(TEST_NAME).await.expect("could not clean up db"); + delete_db_v1(TEST_NAME).await.expect("could not clean up db"); }, ) .await; @@ -481,9 +505,9 @@ async fn test_json_query_tagged() { run_test( || async move { - create_db(TEST_NAME).await.expect("could not setup db"); + create_db_v1(TEST_NAME).await.expect("could not setup db"); - let client = create_client(TEST_NAME); + let client = create_client_v1(TEST_NAME); let write_query = Timestamp::Hours(11) .into_query("weather") @@ -525,7 +549,7 @@ async fn test_json_query_tagged() { ); }, || async move { - delete_db(TEST_NAME).await.expect("could not clean up db"); + delete_db_v1(TEST_NAME).await.expect("could not clean up db"); }, ) .await; @@ -546,9 +570,9 @@ async fn test_json_query_vec() { run_test( || async move { - create_db(TEST_NAME).await.expect("could not setup db"); + create_db_v1(TEST_NAME).await.expect("could not setup db"); - let client = create_client(TEST_NAME); + let client = create_client_v1(TEST_NAME); let write_query1 = Timestamp::Hours(11) .into_query("temperature_vec") .add_field("temperature", 16); @@ -578,7 +602,7 @@ async fn test_json_query_vec() { assert_eq!(result.unwrap().series[0].values.len(), 3); }, || async move { - delete_db(TEST_NAME).await.expect("could not clean up db"); + delete_db_v1(TEST_NAME).await.expect("could not clean up db"); }, ) .await; @@ -595,7 +619,7 @@ async fn test_serde_multi_query() { run_test( || async move { - create_db(TEST_NAME).await.expect("could not setup db"); + create_db_v1(TEST_NAME).await.expect("could not setup db"); #[derive(Deserialize, Debug, PartialEq)] struct Temperature { @@ -609,7 +633,7 @@ async fn test_serde_multi_query() { humidity: i32, } - let client = create_client(TEST_NAME); + let client = create_client_v1(TEST_NAME); let write_query = Timestamp::Hours(11) .into_query("temperature") .add_field("temperature", 16); @@ -652,7 +676,7 @@ async fn test_serde_multi_query() { ); }, || async move { - delete_db(TEST_NAME).await.expect("could not clean up db"); + delete_db_v1(TEST_NAME).await.expect("could not clean up db"); }, ) .await; @@ -665,7 +689,7 @@ async fn test_serde_multi_query() { #[cfg(feature = "serde")] #[cfg(not(tarpaulin_include))] async fn test_wrong_query_errors() { - let client = create_client("test_name"); + let client = create_client_v1("test_name"); let result = client .json_query(ReadQuery::new("CREATE DATABASE this_should_fail")) .await; diff --git a/influxdb/tests/integration_tests_v2.rs b/influxdb/tests/integration_tests_v2.rs index 8eb77ad..a84f604 100644 --- a/influxdb/tests/integration_tests_v2.rs +++ b/influxdb/tests/integration_tests_v2.rs @@ -15,7 +15,9 @@ use influxdb::{Client, Error, ReadQuery, Timestamp}; async fn test_authed_write_and_read() { run_test( || async move { - let client = Client::new("http://127.0.0.1:2086", "mydb").with_token("admintoken"); + use influxdb::InfluxVersion2; + + let client: Client = Client::new("http://127.0.0.1:2086", "mydb").with_token("admintoken"); let write_query = Timestamp::Hours(11) .into_query("weather") .add_field("temperature", 82); @@ -31,7 +33,9 @@ async fn test_authed_write_and_read() { ); }, || async move { - let client = Client::new("http://127.0.0.1:2086", "mydb").with_token("admintoken"); + use influxdb::InfluxVersion2; + + let client: Client = Client::new("http://127.0.0.1:2086", "mydb").with_token("admintoken"); let read_query = ReadQuery::new("DROP MEASUREMENT \"weather\""); let read_result = client.query(read_query).await; assert_result_ok(&read_result); @@ -51,7 +55,9 @@ async fn test_wrong_authed_write_and_read() { run_test( || async move { - let client = Client::new("http://127.0.0.1:2086", "mydb").with_token("falsetoken"); + use influxdb::InfluxVersion2; + + let client: Client = Client::new("http://127.0.0.1:2086", "mydb").with_token("falsetoken"); let write_query = Timestamp::Hours(11) .into_query("weather") .add_field("temperature", 82); @@ -91,7 +97,9 @@ async fn test_non_authed_write_and_read() { run_test( || async move { - let non_authed_client = Client::new("http://127.0.0.1:2086", "mydb"); + use influxdb::InfluxVersion2; + + let non_authed_client: Client = Client::new("http://127.0.0.1:2086", "mydb"); let write_query = Timestamp::Hours(11) .into_query("weather") .add_field("temperature", 82); diff --git a/influxdb/tests/utilities.rs b/influxdb/tests/utilities.rs index dc3b17b..d2146dd 100644 --- a/influxdb/tests/utilities.rs +++ b/influxdb/tests/utilities.rs @@ -1,4 +1,6 @@ use futures_util::FutureExt; +#[cfg(not(tarpaulin_include))] +use influxdb::{InfluxVersion1, InfluxVersion2}; use influxdb::{Client, Error, ReadQuery}; use std::future::Future; use std::panic::{AssertUnwindSafe, UnwindSafe}; @@ -16,33 +18,64 @@ pub fn assert_result_ok(result: &Result< #[allow(dead_code)] #[cfg(not(tarpaulin_include))] -pub fn create_client(db_name: T) -> Client +pub fn create_client_v1(db_name: T) -> Client +where + T: Into, +{ + Client::::new("http://127.0.0.1:8086", db_name) +} + +#[allow(dead_code)] +#[cfg(not(tarpaulin_include))] +pub fn create_client_v2(bucket: T) -> Client +where + T: Into, +{ + Client::::new("http://127.0.0.1:8086", bucket) +} + +#[allow(dead_code)] +#[cfg(not(tarpaulin_include))] +pub async fn create_db_v1(name: T) -> Result where T: Into, { - Client::new("http://127.0.0.1:8086", db_name) + let test_name = name.into(); + let query = format!("CREATE DATABASE {test_name}"); + create_client_v1(test_name).query(ReadQuery::new(query)).await } #[allow(dead_code)] #[cfg(not(tarpaulin_include))] -pub async fn create_db(name: T) -> Result +pub async fn create_db_v2(name: T) -> Result where T: Into, { let test_name = name.into(); let query = format!("CREATE DATABASE {test_name}"); - create_client(test_name).query(ReadQuery::new(query)).await + create_client_v2(test_name).query(ReadQuery::new(query)).await +} + +#[allow(dead_code)] +#[cfg(not(tarpaulin_include))] +pub async fn delete_db_v1(name: T) -> Result +where + T: Into, +{ + let test_name = name.into(); + let query = format!("DROP DATABASE {test_name}"); + create_client_v1(test_name).query(ReadQuery::new(query)).await } #[allow(dead_code)] #[cfg(not(tarpaulin_include))] -pub async fn delete_db(name: T) -> Result +pub async fn delete_db_v2(name: T) -> Result where T: Into, { let test_name = name.into(); let query = format!("DROP DATABASE {test_name}"); - create_client(test_name).query(ReadQuery::new(query)).await + create_client_v2(test_name).query(ReadQuery::new(query)).await } #[cfg(not(tarpaulin_include))]