From 09cd3d4dee0453ec686d435dcfb7b5e5185e4a76 Mon Sep 17 00:00:00 2001 From: Joshua Ford Date: Tue, 21 Oct 2025 15:17:29 -0500 Subject: [PATCH] fix(okta): Source does not update `since` query every interval * Update `since` query every interval to bring in new logs. * Remove dangery support for following header links. * Rewrite the Okta implementation to look more like an HTTP Client source. * Add Okta failure modes to the integration tests. --- src/sources/okta/client.rs | 438 ++++++++++++------------------------- src/sources/okta/tests.rs | 410 +++++++++++++++++++++------------- 2 files changed, 404 insertions(+), 444 deletions(-) diff --git a/src/sources/okta/client.rs b/src/sources/okta/client.rs index c13e7a5cefe27..a2391c203d0bb 100644 --- a/src/sources/okta/client.rs +++ b/src/sources/okta/client.rs @@ -1,42 +1,39 @@ -use std::{sync::Arc, time::Duration}; +use std::collections::HashMap; +use std::time::Duration; use bytes::{Bytes, BytesMut}; use chrono::Utc; -use futures::StreamExt as _; -use futures_util::{FutureExt, Stream, stream}; +use futures_util::FutureExt; use http::Uri; -use hyper::{Body, Request}; +use http::response::Parts as ResponseParts; +use http::uri::Parts as UriParts; use percent_encoding::utf8_percent_encode; use serde_with::serde_as; -use tokio::sync::Mutex; -use tokio_stream::wrappers::IntervalStream; use tokio_util::codec::Decoder as _; use vector_lib::{ - EstimatedJsonEncodedSizeOf, codecs::{ - JsonDeserializerConfig, StreamDecodingError, + StreamDecodingError, decoding::{DeserializerConfig, FramingConfig}, }, - config::{LogNamespace, SourceOutput, proxy::ProxyConfig}, + config::LogNamespace, configurable::configurable_component, event::Event, - json_size::JsonSize, - shutdown::ShutdownSignal, - tls::TlsConfig, }; use crate::{ - SourceSender, + Result, codecs::{Decoder, DecodingConfig}, - config::{SourceConfig, SourceContext}, - http::{HttpClient, HttpError}, - internal_events::{ - DecoderDeserializeError, EndpointBytesReceived, HttpClientEventsReceived, - HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError, - }, + config::{SourceConfig, SourceContext, SourceOutput}, + serde::{default_decoding, default_framing_message_based}, sources, - sources::util::http_client::{default_interval, default_timeout, warn_if_interval_too_low}, - tls::TlsSettings, + sources::util::{ + http::HttpMethod, + http_client::{ + GenericHttpClientInputs, HttpClientBuilder, HttpClientContext, call, default_interval, + default_timeout, warn_if_interval_too_low, + }, + }, + tls::{TlsConfig, TlsSettings}, }; /// Configuration for the `okta` source. @@ -44,6 +41,16 @@ use crate::{ #[configurable_component(source("okta", "Pull Okta system logs via the Okta API",))] #[derive(Clone, Debug)] pub struct OktaConfig { + /// Decoder to use on each received message. + #[configurable(derived)] + #[serde(default = "default_decoding")] + pub decoding: DeserializerConfig, + + /// Framing to use in the decoding. + #[configurable(derived)] + #[serde(default = "default_framing_message_based")] + pub framing: FramingConfig, + /// The Okta subdomain to scrape #[configurable(metadata(docs::examples = "foo.okta.com"))] pub domain: String, @@ -61,6 +68,11 @@ pub struct OktaConfig { #[configurable(metadata(docs::human_name = "Scrape Interval"))] pub interval: Duration, + /// The time to look back for logs. This is used to determine the start time of the first + /// request (that is, the earliest log to fetch) + #[configurable(metadata(docs::human_name = "Since (seconds before now)"))] + pub since: Option, + /// The timeout for each scrape request. #[serde(default = "default_timeout")] #[serde_as(as = "serde_with::DurationSecondsWithFrac")] @@ -68,11 +80,6 @@ pub struct OktaConfig { #[configurable(metadata(docs::human_name = "Scrape Timeout"))] pub timeout: Duration, - /// The time to look back for logs. This is used to determine the start time of the first request - /// (that is, the earliest log to fetch) - #[configurable(metadata(docs::human_name = "Since (seconds before now)"))] - pub since: Option, - /// TLS configuration. #[configurable(derived)] pub tls: Option, @@ -86,6 +93,8 @@ pub struct OktaConfig { impl Default for OktaConfig { fn default() -> Self { Self { + decoding: default_decoding(), + framing: default_framing_message_based(), domain: "".to_string(), token: "".to_string(), interval: default_interval(), @@ -99,34 +108,82 @@ impl Default for OktaConfig { impl_generate_config_from_default!(OktaConfig); -fn find_rel_next_link(header: &str) -> Option { - for part in header.split(',') { - let relpart: Vec<_> = part.split(';').collect(); - if let Some(url) = relpart - .first() - .map(|s| s.trim().trim_matches(|c| c == '<' || c == '>')) - && part.contains("rel=\"next\"") - { - return Some(url.to_string()); +/// Request-specific context for Okta API scraping. +#[derive(Clone)] +struct OktaContext { + decoder: Decoder, + interval: Duration, + since: Option, +} + +impl OktaContext { + /// Decode the events from the byte buffer + fn decode_events(&mut self, buf: &mut BytesMut) -> Vec { + let mut events = Vec::new(); + loop { + match self.decoder.decode_eof(buf) { + Ok(Some((next, _))) => { + events.extend(next); + } + Ok(None) => break, + Err(error) => { + // Error is logged by `crate::codecs::Decoder`, no further + // handling is needed here. + if !error.can_continue() { + break; + } + break; + } + } } + events } - None } -#[async_trait::async_trait] -#[typetag::serde(name = "okta")] -impl SourceConfig for OktaConfig { - async fn build(&self, cx: SourceContext) -> crate::Result { +impl HttpClientBuilder for OktaContext { + type Context = OktaContext; + + fn build(&self, _uri: &Uri) -> Self::Context { + self.clone() + } +} + +impl HttpClientContext for OktaContext { + fn on_response( + &mut self, + _url: &Uri, + _header: &ResponseParts, + body: &Bytes, + ) -> Option> { + let mut buf = BytesMut::new(); + buf.extend_from_slice(body); + + let events = self.decode_events(&mut buf); + + Some(events) + } + + /// Retrieve the next batch of events for the interval window. + fn process_url(&self, url: &Uri) -> Option { + let mut url_parts = UriParts::from(url.clone()); let since = match self.since { Some(since) => Utc::now() - Duration::from_secs(since), - _ => Utc::now(), + _ => Utc::now() - self.interval, }; - let path_and_query = format!( "/api/v1/logs?since={}", utf8_percent_encode(&since.to_rfc3339(), percent_encoding::NON_ALPHANUMERIC) ); + url_parts.path_and_query = Some(path_and_query.parse().ok()?); + + Uri::from_parts(url_parts).ok() + } +} +#[async_trait::async_trait] +#[typetag::serde(name = "okta")] +impl SourceConfig for OktaConfig { + async fn build(&self, cx: SourceContext) -> Result { let mut url_parts = Uri::try_from(&self.domain) .map_err(|_| { format!( @@ -136,36 +193,52 @@ impl SourceConfig for OktaConfig { })? .into_parts(); - url_parts.path_and_query = Some(path_and_query.parse()?); if url_parts.scheme.is_none() { url_parts.scheme = Some(http::uri::Scheme::HTTPS); } + url_parts.path_and_query = Some("/".parse()?); - let url = Uri::from_parts(url_parts).map_err(|_| { - format!( - "Invalid domain: {}. Must be a valid Okta subdomain.", - self.domain - ) - })?; + let urls = vec![Uri::from_parts(url_parts)?]; let tls = TlsSettings::from_options(self.tls.as_ref())?; + let decoding = self.decoding.clone(); + let framing = self.framing.clone(); let log_namespace = cx.log_namespace(self.log_namespace); + let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?; + + let context = OktaContext { + decoder, + interval: self.interval, + since: self.since, + }; + warn_if_interval_too_low(self.timeout, self.interval); - Ok(run( - url, + let mut headers = HashMap::new(); + headers.insert( + http::header::AUTHORIZATION.to_string(), + vec![format!("SSWS {0}", self.token).to_string()], + ); + headers.insert( + http::header::ACCEPT.to_string(), + vec!["application/json".to_string()], + ); + + let inputs = GenericHttpClientInputs { + urls, + interval: self.interval, + timeout: self.timeout, + headers, + content_type: "application/json".to_string(), + auth: None, tls, - cx.proxy, - self.token.clone(), - self.interval, - self.timeout, - log_namespace, - cx.shutdown, - cx.out, - ) - .boxed()) + proxy: cx.proxy.clone(), + shutdown: cx.shutdown, + }; + + Ok(call(inputs, context, cx.out, HttpMethod::Get).boxed()) } fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { @@ -173,9 +246,14 @@ impl SourceConfig for OktaConfig { // and is merged here. let log_namespace = global_log_namespace.merge(self.log_namespace); + let schema_definition = self + .decoding + .schema_definition(log_namespace) + .with_standard_vector_source_metadata(); + vec![SourceOutput::new_maybe_logs( - JsonDeserializerConfig::default().output_type(), - JsonDeserializerConfig::default().schema_definition(log_namespace), + self.decoding.output_type(), + schema_definition, )] } @@ -183,239 +261,3 @@ impl SourceConfig for OktaConfig { false } } - -fn enrich_events(events: &mut Vec, log_namespace: LogNamespace) { - let now = Utc::now(); - for event in events { - log_namespace.insert_standard_vector_source_metadata( - event.as_mut_log(), - OktaConfig::NAME, - now, - ); - } -} - -type OktaRunResult = - Result<(http::response::Parts, Bytes, Option), Box>; - -type OktaTimeoutResult = - Result, HttpError>, tokio::time::error::Elapsed>; - -async fn run_once(url: String, result: OktaTimeoutResult, timeout: Duration) -> OktaRunResult { - let mut next: Option = None; - match result { - Ok(Ok(response)) => { - let (header, body) = response.into_parts(); - if let Some(next_url) = header - .headers - .get_all("link") - .iter() - .filter_map(|v| v.to_str().ok()) - .filter_map(find_rel_next_link) - .next() - .and_then(|next| Uri::try_from(next).ok()) - { - next = Some(next_url); - }; - - let body = hyper::body::to_bytes(body).await?; - - emit!(EndpointBytesReceived { - byte_size: body.len(), - protocol: "http", - endpoint: &url, - }); - Ok((header, body, next)) - } - Ok(Err(error)) => Err(error.into()), - Err(_) => Err(format!("Timeout error: request exceeded {}s", timeout.as_secs_f64()).into()), - } -} - -fn handle_response( - response: OktaRunResult, - decoder: Decoder, - log_namespace: LogNamespace, - url: String, -) -> Option + Send + use<>> { - match response { - Ok((header, body, _)) if header.status == hyper::StatusCode::OK => { - let mut buf = BytesMut::new(); - buf.extend_from_slice(&body); - let mut events = decode_events(&mut buf, decoder); - let byte_size = if events.is_empty() { - JsonSize::zero() - } else { - events.estimated_json_encoded_size_of() - }; - - emit!(HttpClientEventsReceived { - byte_size, - count: events.len(), - url, - }); - - if events.is_empty() { - return None; - } - - enrich_events(&mut events, log_namespace); - - Some(stream::iter(events)) - } - Ok((header, _, _)) => { - emit!(HttpClientHttpResponseError { - code: header.status, - url, - }); - None - } - Err(error) => { - emit!(HttpClientHttpError { error, url }); - None - } - } -} - -/// Calls the Okta system logs API and sends the events to the output stream. -/// -/// Okta's API paginates with a `link` header that contains a url (in `rel=next`) to the next page of results, -/// and will always return a `rel=next` link regardless of whether there are more results. -/// This function fetches all pages until there are no more results (an empty JSON array) and finishes until -/// the next interval -/// The function will run until the `shutdown` signal is received. -#[allow(clippy::too_many_arguments)] // internal function -async fn run( - url: Uri, - tls: TlsSettings, - proxy: ProxyConfig, - token: String, - interval: Duration, - timeout: Duration, - log_namespace: LogNamespace, - shutdown: ShutdownSignal, - mut out: SourceSender, -) -> Result<(), ()> { - let url_mutex = Arc::new(Mutex::new(url.clone())); - let decoder = DecodingConfig::new( - FramingConfig::Bytes, - DeserializerConfig::Json(JsonDeserializerConfig::default()), - log_namespace, - ) - .build() - .map_err(|ref e| { - emit!(DecoderDeserializeError { error: e }); - })?; - - let client = HttpClient::new(tls, &proxy).map_err(|e| { - emit!(HttpClientHttpError { - error: Box::new(e), - url: url.to_string() - }); - })?; - - let mut stream = IntervalStream::new(tokio::time::interval(interval)) - .take_until(shutdown) - .then(move |_| { - let client = client.clone(); - let url_mutex = Arc::clone(&url_mutex); - let token = token.clone(); - let decoder = decoder.clone(); - - async move { - stream::unfold((), move |_| { - let url_mutex = Arc::clone(&url_mutex); - let token = token.clone(); - let decoder = decoder.clone(); - let client = client.clone(); - - async move { - let (run_url, response): (String, OktaRunResult) = { - // We update the actual URL based on the response the API returns - // so the critical section is between here & when the request finishes - let mut url_lock = url_mutex.lock().await; - let url = url_lock.to_string(); - - let mut request = match Request::get(&url).body(Body::empty()) { - Ok(request) => request, - Err(e) => { - emit!(HttpClientHttpError { - error: e.into(), - url: url.clone(), - }); - return None; - } - }; - - let headers = request.headers_mut(); - headers.insert( - http::header::AUTHORIZATION, - format!("SSWS {token}").parse().unwrap(), - ); - headers - .insert(http::header::ACCEPT, "application/json".parse().unwrap()); - headers.insert( - http::header::CONTENT_TYPE, - "application/json".parse().unwrap(), - ); - - let client = client.clone(); - let response = tokio::time::timeout(timeout, client.send(request)) - .then({ - let url = url.clone(); - move |result| run_once(url, result, timeout) - }) - .await; - - if let Ok((_, _, Some(ref next))) = response { - *url_lock = next.clone(); - } - let new_url = url_lock.to_string(); - - (new_url, response) - }; - - handle_response(response, decoder, log_namespace, run_url) - .map(|events| (events, ())) - } - }) - .flatten() - .boxed() - } - }) - .flatten_unordered(None) - .boxed(); - - match out.send_event_stream(&mut stream).await { - Ok(()) => { - debug!("Finished sending."); - Ok(()) - } - Err(_) => { - let (count, _) = stream.size_hint(); - emit!(StreamClosedError { count }); - Err(()) - } - } -} - -fn decode_events(buf: &mut BytesMut, mut decoder: Decoder) -> Vec { - let mut events = Vec::new(); - loop { - match decoder.decode_eof(buf) { - Ok(Some((next, _))) => { - events.extend(next); - } - Ok(None) => break, - Err(error) => { - // Error is logged by `crate::codecs::Decoder`, no further - // handling is needed here. - if !error.can_continue() { - break; - } - break; - } - } - } - events -} diff --git a/src/sources/okta/tests.rs b/src/sources/okta/tests.rs index f4c0f7ee4d7dd..2c0a9eada5e86 100644 --- a/src/sources/okta/tests.rs +++ b/src/sources/okta/tests.rs @@ -1,18 +1,18 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; +use std::collections::HashMap; +use http::{Response, StatusCode}; use tokio::time::Duration; use vector_lib::{config::LogNamespace, event::Event}; use warp::Filter; use crate::{ components::validation::prelude::*, - config::log_schema, sources::okta::OktaConfig, test_util::{ - components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance}, + components::{ + COMPONENT_ERROR_TAGS, HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance, + run_and_assert_source_error, + }, next_addr, test_generate_config, wait_for_tcp, }, }; @@ -21,10 +21,10 @@ pub(crate) const INTERVAL: Duration = Duration::from_secs(10); pub(crate) const TIMEOUT: Duration = Duration::from_secs(1); -/// The happy path should yield at least one event and must emit the required internal events for sources. +/// Run queries against an Okta endpoint and verify compliance. pub(crate) async fn run_compliance(config: OktaConfig) -> Vec { let events = - run_and_assert_source_compliance(config, Duration::from_secs(5), &HTTP_PULL_SOURCE_TAGS) + run_and_assert_source_compliance(config, Duration::from_secs(3), &HTTP_PULL_SOURCE_TAGS) .await; assert!(!events.is_empty()); @@ -32,6 +32,168 @@ pub(crate) async fn run_compliance(config: OktaConfig) -> Vec { events } +/// The error path should not yield any events and must emit the required error internal events. +pub(crate) async fn run_error(config: OktaConfig) { + let events = + run_and_assert_source_error(config, Duration::from_secs(3), &COMPONENT_ERROR_TAGS).await; + + assert!(events.is_empty()); +} + +const OKTA_200_EMPTY: &str = r#"[]"#; +const OKTA_200_RESPONSE: &str = r#" +[ + { + "actor": { + "id": "00uttidj01jqL21aM1d6", + "type": "User", + "alternateId": "john.doe@example.com", + "displayName": "John Doe", + "detailEntry": null + }, + "client": { + "userAgent": { + "rawUserAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36", + "os": "Mac OS X", + "browser": "CHROME" + }, + "zone": null, + "device": "Computer", + "id": null, + "ipAddress": "10.0.0.1", + "geographicalContext": { + "city": "New York", + "state": "New York", + "country": "United States", + "postalCode": 10013, + "geolocation": { + "lat": 40.3157, + "lon": -74.01 + } + } + }, + "device": { + "id": "guofdhyjex1feOgbN1d9", + "name": "Mac15,6", + "os_platform": "OSX", + "os_version": "14.6.0", + "managed": false, + "registered": true, + "device_integrator": null, + "disk_encryption_type": "ALL_INTERNAL_VOLUMES", + "screen_lock_type": "BIOMETRIC", + "jailbreak": null, + "secure_hardware_present": true + }, + "authenticationContext": { + "authenticationProvider": null, + "credentialProvider": null, + "credentialType": null, + "issuer": null, + "interface": null, + "authenticationStep": 0, + "rootSessionId": "idxBager62CSveUkTxvgRtonA", + "externalSessionId": "idxBager62CSveUkTxvgRtonA" + }, + "displayMessage": "User login to Okta", + "eventType": "user.session.start", + "outcome": { + "result": "SUCCESS", + "reason": null + }, + "published": "2024-08-13T15:58:20.353Z", + "securityContext": { + "asNumber": 394089, + "asOrg": "ASN 0000", + "isp": "google", + "domain": null, + "isProxy": false + }, + "severity": "INFO", + "debugContext": { + "debugData": { + "requestId": "ab609228fe84ce59cdcbfa690bcce016", + "requestUri": "/idp/idx/authenticators/poll", + "url": "/idp/idx/authenticators/poll" + } + }, + "legacyEventType": "core.user_auth.login_success", + "transaction": { + "type": "WEB", + "id": "ab609228fe84ce59cdcbfa690bgce016", + "detail": null + }, + "uuid": "dc9fd3c0-598c-11ef-8478-2b7584bf8d5a", + "version": 0, + "request": { + "ipChain": [ + { + "ip": "10.0.0.1", + "geographicalContext": { + "city": "New York", + "state": "New York", + "country": "United States", + "postalCode": 10013, + "geolocation": { + "lat": 40.3157, + "lon": -74.01 + } + }, + "version": "V4", + "source": null + } + ] + }, + "target": [ + { + "id": "pfdfdhyjf0HMbkP2e1d7", + "type": "AuthenticatorEnrollment", + "alternateId": "unknown", + "displayName": "Okta Verify", + "detailEntry": null + }, + { + "id": "0oatxlef9sQvvqInq5d6", + "type": "AppInstance", + "alternateId": "Okta Admin Console", + "displayName": "Okta Admin Console", + "detailEntry": null + } + ] + } +] +"#; + +const OKTA_400_VALIDATION_FAILED: &str = r#" +{ + "errorCode": "E0000001", + "errorSummary": "Api validation failed: {0}", + "errorLink": "E0000001", + "errorId": "sampleiCF-8D5rLW6myqiPItW", + "errorCauses": [] +} +"#; + +const OKTA_403_ACCESS_DENIED: &str = r#" +{ + "errorCode": "E0000006", + "errorSummary": "You do not have permission to perform the requested action", + "errorLink": "E0000006", + "errorId": "sampleNUSD_8fdkFd8fs8SDBK", + "errorCauses": [] +} +"#; + +const OKTA_429_RATE_LIMIT_EXCEEDED: &str = r#" +{ + "errorCode": "E0000047", + "errorSummary": "API call exceeded rate limit due to too many requests.", + "errorLink": "E0000047", + "errorId": "sampleQPivGUj_ND5v78vbYWW", + "errorCauses": [] +} +"#; + #[test] fn okta_generate_config() { test_generate_config::(); @@ -59,187 +221,143 @@ impl ValidatableComponent for OktaConfig { register_validatable_component!(OktaConfig); #[tokio::test] -async fn okta_compliance() { +async fn with_default_config() { let in_addr = next_addr(); let dummy_endpoint = warp::path!("api" / "v1" / "logs") - .and(warp::query::>()) - .map({ - move |q: std::collections::HashMap| match q.get("after") { - None => warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[{"data":"foo"},{"data":"bar"}]"#) - .unwrap(), - Some(_) => warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[]"#) - .unwrap(), - } + .and(warp::get()) + .and(warp::query::>()) + .and(warp::header::exact("Accept", "application/json")) + .map(|params: HashMap| { + assert!(params.contains_key("since")); + Response::builder() + .status(StatusCode::OK) + .body(OKTA_200_EMPTY) + .unwrap() }); tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); wait_for_tcp(in_addr).await; - let events = run_compliance(OktaConfig { + run_compliance(OktaConfig { domain: format!("http://{in_addr}"), token: "token".to_string(), interval: INTERVAL, timeout: TIMEOUT, - log_namespace: None, ..Default::default() }) .await; +} - assert_eq!(events.len(), 2); +#[tokio::test] +async fn with_okta_events() { + let in_addr = next_addr(); - for event in events.iter() { - assert_eq!( - event.as_log()[log_schema().source_type_key().unwrap().to_string()], - OktaConfig::NAME.into() - ); - } - let log_event = events[0].as_log(); - assert_eq!( - log_event - .get("data") - .expect("data must be available") - .as_str() - .unwrap(), - "foo" - ); + let dummy_endpoint = warp::path!("api" / "v1" / "logs") + .and(warp::get()) + .and(warp::query::>()) + .and(warp::header::exact("Accept", "application/json")) + .map(|params: HashMap| { + assert!(params.contains_key("since")); + Response::builder() + .status(StatusCode::OK) + .body(OKTA_200_RESPONSE) + .unwrap() + }); + + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; + + run_compliance(OktaConfig { + domain: format!("http://{in_addr}"), + token: "token".to_string(), + interval: INTERVAL, + timeout: TIMEOUT, + ..Default::default() + }) + .await; } #[tokio::test] -async fn okta_follows_rel() { - let addr = next_addr(); +async fn with_bad_request() { + let in_addr = next_addr(); let dummy_endpoint = warp::path!("api" / "v1" / "logs") - .and(warp::query::>()) - .map({ - move |q: std::collections::HashMap| match q.get("after") { - None => warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[{"data":"foo"}]"#) - .unwrap(), - Some(after) if after == "bar" => warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[{"data":"bar"}]"#) - .unwrap(), - Some(after) if after == "baz" => warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[]"#) - .unwrap(), - Some(_) => panic!("following Link header with zero length reply"), - } + .and(warp::get()) + .and(warp::query::>()) + .and(warp::header::exact("Accept", "application/json")) + .map(|_| { + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(OKTA_400_VALIDATION_FAILED) + .unwrap() }); - tokio::spawn(warp::serve(dummy_endpoint).run(addr)); - wait_for_tcp(addr).await; + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; - let events = run_compliance(OktaConfig { - domain: format!("http://{addr}"), + run_error(OktaConfig { + domain: format!("http://{in_addr}"), token: "token".to_string(), interval: INTERVAL, timeout: TIMEOUT, - log_namespace: None, ..Default::default() }) .await; +} - assert_eq!(events.len(), 2); +#[tokio::test] +async fn with_bad_token() { + let in_addr = next_addr(); - for event in events.iter() { - assert_eq!( - event.as_log()[log_schema().source_type_key().unwrap().to_string()], - OktaConfig::NAME.into() - ); - } - assert_eq!(events[0].as_log()["data"].as_str().unwrap(), "foo"); - assert_eq!(events[1].as_log()["data"].as_str().unwrap(), "bar"); + let dummy_endpoint = warp::path!("api" / "v1" / "logs") + .and(warp::get()) + .and(warp::query::>()) + .and(warp::header::exact("Accept", "application/json")) + .map(|_| { + Response::builder() + .status(StatusCode::FORBIDDEN) + .body(OKTA_403_ACCESS_DENIED) + .unwrap() + }); + + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; + + run_error(OktaConfig { + domain: format!("http://{in_addr}"), + token: "badtoken".to_string(), + interval: INTERVAL, + timeout: TIMEOUT, + ..Default::default() + }) + .await; } #[tokio::test] -async fn okta_persists_rel() { - // the client follows `next` links; on the next interval it should pick up where it left off - // and not start over from the beginning - let addr = next_addr(); - - let init_guard: Arc = Arc::new(AtomicBool::new(false)); +async fn with_rate_limit_exceeded() { + let in_addr = next_addr(); - // the first request sets `seen` but returns 0 events, ending the inner stream, - // the next interval should pick up where it left off let dummy_endpoint = warp::path!("api" / "v1" / "logs") - .and(warp::query::>()) - .map({ - move |q: std::collections::HashMap| match q.get("after") { - None => warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[{"data":"foo"}]"#) - .unwrap(), - Some(after) if after == "test" => { - let initialized = init_guard.swap(true, Ordering::Relaxed); - if !initialized { - warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[]"#) - .unwrap() - } else { - warp::http::Response::builder() - .header("Content-Type", "application/json") - .header( - "link", - format!("; rel=\"next\""), - ) - .body(r#"[{"initialized":"true"}]"#) - .unwrap() - } - } - Some(_) => warp::http::Response::builder() - .header("Content-Type", "application/json") - .body(r#"[]"#) - .unwrap(), - } + .and(warp::get()) + .and(warp::query::>()) + .and(warp::header::exact("Accept", "application/json")) + .map(|_| { + Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body(OKTA_429_RATE_LIMIT_EXCEEDED) + .unwrap() }); - tokio::spawn(warp::serve(dummy_endpoint).run(addr)); - wait_for_tcp(addr).await; + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; - let events = run_compliance(OktaConfig { - domain: format!("http://{addr}"), + run_error(OktaConfig { + domain: format!("http://{in_addr}"), token: "token".to_string(), - interval: Duration::from_secs(1), - timeout: Duration::from_millis(100), + interval: INTERVAL, + timeout: TIMEOUT, ..Default::default() }) .await; - - assert_eq!(events.len(), 2); }