From 02f9b654bb83f32468c1a691db6c7a812c2a325e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 26 Dec 2019 23:12:02 +0100 Subject: [PATCH 1/5] feat: runtime independent examples --- Cargo.toml | 9 +++++++ examples/README.md | 6 +++-- examples/futures.rs | 57 ++++++++++++++++++++++++++++++++++++++++++ examples/tokio.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 examples/futures.rs create mode 100644 examples/tokio.rs diff --git a/Cargo.toml b/Cargo.toml index 424f33a..ef6c723 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ lettre_email = "0.9" pretty_assertions = "0.6.1" async-smtp = { version = "0.3.0" } async-std = { version = "1.8.0", default-features = false, features = ["std", "attributes"] } +tokio = "0.2.6" [[example]] name = "basic" @@ -51,6 +52,14 @@ required-features = ["default"] name = "gmail_oauth2" required-features = ["default"] +[[example]] +name = "futures" +required-features = ["default"] + +[[example]] +name = "tokio" +required-features = ["default"] + [[test]] name = "imap_integration" required-features = ["default"] diff --git a/examples/README.md b/examples/README.md index fe9abb6..241a71d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,7 +7,9 @@ Examples: * basic - This is a very basic example of using the client. * idle - This is a basic example of how to perform an IMAP IDLE call - and interrupt it based on typing a line into stdin. + and interrupt it based on typing a line into stdin. * gmail_oauth2 - This is an example using oauth2 for logging into - gmail via the OAUTH2 mechanism. + gmail via the OAUTH2 mechanism. + +* futures - The basic example, but using the `futures` executor. diff --git a/examples/futures.rs b/examples/futures.rs new file mode 100644 index 0000000..38c7937 --- /dev/null +++ b/examples/futures.rs @@ -0,0 +1,57 @@ +use async_imap::error::{Error, Result}; +use async_std::prelude::*; +use std::env; + +fn main() -> Result<()> { + futures::executor::block_on(async { + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("need three arguments: imap-server login password"); + Err(Error::Bad("need three arguments".into())) + } else { + let res = fetch_inbox_top(&args[1], &args[2], &args[3]).await?; + println!("**result:\n{}", res.unwrap()); + Ok(()) + } + }) +} + +async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Result> { + let tls = async_native_tls::TlsConnector::new(); + + // we pass in the imap_server twice to check that the server's TLS + // certificate is valid for the imap_server we're connecting to. + let client = async_imap::connect((imap_server, 993), imap_server, tls).await?; + println!("-- connected to {}:{}", imap_server, 993); + + // the client we have here is unauthenticated. + // to do anything useful with the e-mails, we need to log in + let mut imap_session = client.login(login, password).await.map_err(|e| e.0)?; + println!("-- logged in a {}", login); + + // we want to fetch the first email in the INBOX mailbox + imap_session.select("INBOX").await?; + println!("-- INBOX selected"); + + // fetch message number 1 in this mailbox, along with its RFC822 field. + // RFC 822 dictates the format of the body of e-mails + let messages_stream = imap_session.fetch("1", "RFC822").await?; + let messages: Vec<_> = messages_stream.collect::>().await?; + let message = if let Some(m) = messages.first() { + m + } else { + return Ok(None); + }; + + // extract the message's body + let body = message.body().expect("message did not have a body!"); + let body = std::str::from_utf8(body) + .expect("message was not valid utf-8") + .to_string(); + println!("-- 1 message received, logging out"); + + // be nice to the server and log out + imap_session.logout().await?; + + Ok(Some(body)) +} diff --git a/examples/tokio.rs b/examples/tokio.rs new file mode 100644 index 0000000..3f9202e --- /dev/null +++ b/examples/tokio.rs @@ -0,0 +1,60 @@ +use async_imap::error::{Error, Result}; +use async_std::prelude::*; +use std::env; +use tokio::runtime::Runtime; + +fn main() -> Result<()> { + let mut rt = Runtime::new().expect("unable to create runtime"); + + rt.block_on(async { + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("need three arguments: imap-server login password"); + Err(Error::Bad("need three arguments".into())) + } else { + let res = fetch_inbox_top(&args[1], &args[2], &args[3]).await?; + println!("**result:\n{}", res.unwrap()); + Ok(()) + } + }) +} + +async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Result> { + let tls = async_native_tls::TlsConnector::new(); + + // we pass in the imap_server twice to check that the server's TLS + // certificate is valid for the imap_server we're connecting to. + let client = async_imap::connect((imap_server, 993), imap_server, tls).await?; + println!("-- connected to {}:{}", imap_server, 993); + + // the client we have here is unauthenticated. + // to do anything useful with the e-mails, we need to log in + let mut imap_session = client.login(login, password).await.map_err(|e| e.0)?; + println!("-- logged in a {}", login); + + // we want to fetch the first email in the INBOX mailbox + imap_session.select("INBOX").await?; + println!("-- INBOX selected"); + + // fetch message number 1 in this mailbox, along with its RFC822 field. + // RFC 822 dictates the format of the body of e-mails + let messages_stream = imap_session.fetch("1", "RFC822").await?; + let messages: Vec<_> = messages_stream.collect::>().await?; + let message = if let Some(m) = messages.first() { + m + } else { + return Ok(None); + }; + + // extract the message's body + let body = message.body().expect("message did not have a body!"); + let body = std::str::from_utf8(body) + .expect("message was not valid utf-8") + .to_string(); + println!("-- 1 message received, logging out"); + + // be nice to the server and log out + imap_session.logout().await?; + + Ok(Some(body)) +} From 4af4c852d1dedbd3cbc68911d21f939d40ecb211 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 24 May 2022 13:05:27 +0200 Subject: [PATCH 2/5] feat: update deps --- Cargo.toml | 14 ++++----- examples/tokio.rs | 2 +- src/mock_stream.rs | 15 +-------- src/parse.rs | 4 +-- src/types/name.rs | 66 ++------------------------------------- tests/imap_integration.rs | 2 +- 6 files changed, 15 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ef6c723..b8a5440 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,15 +22,15 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" } default = [] [dependencies] -imap-proto = "0.14.3" -nom = "6.0" +imap-proto = "0.16.1" +nom = "7.0" base64 = "0.13" chrono = "0.4" -async-native-tls = { version = "0.3.3" } +async-native-tls = { version = "0.3" } async-std = { version = "1.8.0", default-features = false, features = ["std"] } pin-utils = "0.1.0-alpha.4" futures = "0.3.15" -ouroboros = "0.9" +ouroboros = "0.15" stop-token = "0.7" byte-pool = "0.2.2" once_cell = "1.8.0" @@ -39,10 +39,10 @@ thiserror = "1.0.9" [dev-dependencies] lettre_email = "0.9" -pretty_assertions = "0.6.1" -async-smtp = { version = "0.3.0" } +pretty_assertions = "1.2" +async-smtp = { version = "0.4" } async-std = { version = "1.8.0", default-features = false, features = ["std", "attributes"] } -tokio = "0.2.6" +tokio = { version = "1", features = ["rt-multi-thread"] } [[example]] name = "basic" diff --git a/examples/tokio.rs b/examples/tokio.rs index 3f9202e..1bd9c32 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -4,7 +4,7 @@ use std::env; use tokio::runtime::Runtime; fn main() -> Result<()> { - let mut rt = Runtime::new().expect("unable to create runtime"); + let rt = Runtime::new().expect("unable to create runtime"); rt.block_on(async { let args: Vec = env::args().collect(); diff --git a/src/mock_stream.rs b/src/mock_stream.rs index fb19667..5d1743f 100644 --- a/src/mock_stream.rs +++ b/src/mock_stream.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use async_std::io::{Error, ErrorKind, Read, Result, Write}; use futures::task::{Context, Poll}; -#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[derive(Default, Clone, Debug, Eq, PartialEq, Hash)] pub struct MockStream { read_buf: Vec, read_pos: usize, @@ -14,19 +14,6 @@ pub struct MockStream { read_delay: usize, } -impl Default for MockStream { - fn default() -> Self { - MockStream { - read_buf: Vec::new(), - read_pos: 0, - written_buf: Vec::new(), - err_on_read: false, - eof_on_read: false, - read_delay: 0, - } - } -} - impl MockStream { pub fn new(read_buf: Vec) -> MockStream { MockStream::default().with_buf(read_buf) diff --git a/src/parse.rs b/src/parse.rs index 07663de..79a6170 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -446,7 +446,7 @@ mod tests { assert_eq!(names.len(), 1); assert_eq!( names[0].attributes(), - &[NameAttribute::from("\\HasNoChildren")] + &[NameAttribute::Extension("\\HasNoChildren".into())] ); assert_eq!(names[0].delimiter(), Some(".")); assert_eq!(names[0].name(), "INBOX"); @@ -535,7 +535,7 @@ mod tests { assert_eq!(names.len(), 1); assert_eq!( names[0].attributes(), - &[NameAttribute::from("\\HasNoChildren")] + &[NameAttribute::Extension("\\HasNoChildren".into())] ); assert_eq!(names[0].delimiter(), Some(".")); assert_eq!(names[0].name(), "INBOX"); diff --git a/src/types/name.rs b/src/types/name.rs index 43374b7..be978ed 100644 --- a/src/types/name.rs +++ b/src/types/name.rs @@ -1,5 +1,4 @@ -use std::borrow::Cow; - +pub use imap_proto::types::NameAttribute; use imap_proto::{MailboxDatum, Response}; use crate::types::ResponseData; @@ -21,74 +20,15 @@ pub struct InnerName<'a> { name: &'a str, } -/// An attribute set for an IMAP name. -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub enum NameAttribute<'a> { - /// It is not possible for any child levels of hierarchy to exist - /// under this name; no child levels exist now and none can be - /// created in the future. - NoInferiors, - - /// It is not possible to use this name as a selectable mailbox. - NoSelect, - - /// The mailbox has been marked "interesting" by the server; the - /// mailbox probably contains messages that have been added since - /// the last time the mailbox was selected. - Marked, - - /// The mailbox does not contain any additional messages since the - /// last time the mailbox was selected. - Unmarked, - - /// A non-standard user- or server-defined name attribute. - Custom(Cow<'a, str>), -} - -impl NameAttribute<'static> { - fn system(s: &str) -> Option { - match s { - "\\Noinferiors" => Some(NameAttribute::NoInferiors), - "\\Noselect" => Some(NameAttribute::NoSelect), - "\\Marked" => Some(NameAttribute::Marked), - "\\Unmarked" => Some(NameAttribute::Unmarked), - _ => None, - } - } -} - -impl<'a> From for NameAttribute<'a> { - fn from(s: String) -> Self { - if let Some(f) = NameAttribute::system(&s) { - f - } else { - NameAttribute::Custom(Cow::Owned(s)) - } - } -} - -impl<'a> From<&'a str> for NameAttribute<'a> { - fn from(s: &'a str) -> Self { - if let Some(f) = NameAttribute::system(s) { - f - } else { - NameAttribute::Custom(Cow::Borrowed(s)) - } - } -} - impl Name { pub(crate) fn from_mailbox_data(resp: ResponseData) -> Self { Name::new(Box::new(resp), |response| match response.parsed() { Response::MailboxData(MailboxDatum::List { - flags, + name_attributes, delimiter, name, }) => InnerName { - attributes: flags - .iter() - .map(|s| NameAttribute::from(s.as_ref())) - .collect(), + attributes: name_attributes.to_owned(), delimiter: delimiter.as_deref(), name, }, diff --git a/tests/imap_integration.rs b/tests/imap_integration.rs index 817cbe1..30148c5 100644 --- a/tests/imap_integration.rs +++ b/tests/imap_integration.rs @@ -110,7 +110,7 @@ fn make_email(to: &str) -> async_smtp::SendableEmail { vec![to.parse().unwrap()], ) .unwrap(), - message_id.to_string(), + message_id, format!("To: <{}>\r\nFrom: \r\nMessage-ID: <{}.msg@localhost>\r\nSubject: My first e-mail\r\n\r\nHello world from SMTP", to, message_id), ) } From 2b5b5ad42a12a23b8b39630e78c61f627af83dec Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 24 May 2022 13:21:40 +0200 Subject: [PATCH 3/5] refactor: reduce dependency on async-std --- examples/tokio.rs | 7 ++++--- src/client.rs | 4 ++-- src/error.rs | 3 +-- src/extensions/idle.rs | 5 ++--- src/extensions/quota.rs | 11 +++++------ src/imap_stream.rs | 8 ++++---- src/lib.rs | 4 ++-- src/mock_stream.rs | 2 +- src/parse.rs | 27 +++++++++++++++------------ 9 files changed, 36 insertions(+), 35 deletions(-) diff --git a/examples/tokio.rs b/examples/tokio.rs index 1bd9c32..86131cc 100644 --- a/examples/tokio.rs +++ b/examples/tokio.rs @@ -1,6 +1,7 @@ -use async_imap::error::{Error, Result}; -use async_std::prelude::*; use std::env; + +use async_imap::error::{Error, Result}; +use futures::TryStreamExt; use tokio::runtime::Runtime; fn main() -> Result<()> { @@ -39,7 +40,7 @@ async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Resu // fetch message number 1 in this mailbox, along with its RFC822 field. // RFC 822 dictates the format of the body of e-mails let messages_stream = imap_session.fetch("1", "RFC822").await?; - let messages: Vec<_> = messages_stream.collect::>().await?; + let messages: Vec<_> = messages_stream.try_collect().await?; let message = if let Some(m) = messages.first() { m } else { diff --git a/src/client.rs b/src/client.rs index b72b0e9..45ade70 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,10 +6,10 @@ use std::str; use async_native_tls::{TlsConnector, TlsStream}; use async_std::channel; -use async_std::io::{self, Read, Write}; use async_std::net::{TcpStream, ToSocketAddrs}; -use async_std::prelude::*; use extensions::quota::parse_get_quota_root; +use futures::io::{self, AsyncRead as Read, AsyncWrite as Write}; +use futures::prelude::*; use imap_proto::{RequestId, Response}; use super::authenticator::Authenticator; diff --git a/src/error.rs b/src/error.rs index fe686ea..a385818 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,13 +1,12 @@ //! IMAP error types. use std::io::Error as IoError; -use std::result; use std::str::Utf8Error; use base64::DecodeError; /// A convenience wrapper around `Result` for `imap::Error`. -pub type Result = result::Result; +pub type Result = std::result::Result; /// A set of errors that can occur in the IMAP client #[derive(thiserror::Error, Debug)] diff --git a/src/extensions/idle.rs b/src/extensions/idle.rs index 1dc49bb..70f1b88 100644 --- a/src/extensions/idle.rs +++ b/src/extensions/idle.rs @@ -4,9 +4,8 @@ use std::fmt; use std::pin::Pin; use std::time::Duration; -use async_std::io::{self, Read, Write}; -use async_std::prelude::*; -use async_std::stream::Stream; +use futures::io::{self, AsyncRead as Read, AsyncWrite as Write}; +use futures::prelude::*; use futures::task::{Context, Poll}; use imap_proto::{RequestId, Response, Status}; use stop_token::prelude::*; diff --git a/src/extensions/quota.rs b/src/extensions/quota.rs index 7d4abc4..0b2676e 100644 --- a/src/extensions/quota.rs +++ b/src/extensions/quota.rs @@ -1,15 +1,14 @@ //! Adds support for the GETQUOTA and GETQUOTAROOT commands specificed in [RFC2087](https://tools.ietf.org/html/rfc2087). use async_std::channel; -use async_std::io; -use async_std::prelude::*; -use async_std::stream::Stream; +use futures::io; +use futures::prelude::*; use imap_proto::{self, RequestId, Response}; use crate::types::*; use crate::{ error::Result, - parse::{filter_sync, handle_unilateral}, + parse::{filter, handle_unilateral}, }; use crate::{ error::{Error, ParseError}, @@ -23,7 +22,7 @@ pub(crate) async fn parse_get_quota> + ) -> Result { let mut quota = None; while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -53,7 +52,7 @@ pub(crate) async fn parse_get_quota_root = Vec::new(); while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { diff --git a/src/imap_stream.rs b/src/imap_stream.rs index 492f0a5..5480530 100644 --- a/src/imap_stream.rs +++ b/src/imap_stream.rs @@ -1,12 +1,12 @@ use std::fmt; use std::pin::Pin; +use std::sync::Arc; -use async_std::io::{self, Read, Write}; -use async_std::prelude::*; -use async_std::stream::Stream; -use async_std::sync::Arc; use byte_pool::{Block, BytePool}; +use futures::io::{self, AsyncRead as Read, AsyncWrite as Write}; +use futures::stream::Stream; use futures::task::{Context, Poll}; +use futures::AsyncWriteExt; use nom::Needed; use once_cell::sync::Lazy; diff --git a/src/lib.rs b/src/lib.rs index 91be90f..aeb238e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ //! Below is a basic client example. See the `examples/` directory for more. //! //! ```no_run -//! use async_std::prelude::*; +//! use futures::prelude::*; //! use async_imap::error::Result; //! //! async fn fetch_inbox_top() -> Result> { @@ -40,7 +40,7 @@ //! // fetch message number 1 in this mailbox, along with its RFC822 field. //! // RFC 822 dictates the format of the body of e-mails //! let messages_stream = imap_session.fetch("1", "RFC822").await?; -//! let messages: Vec<_> = messages_stream.collect::>().await?; +//! let messages: Vec<_> = messages_stream.try_collect().await?; //! let message = if let Some(m) = messages.first() { //! m //! } else { diff --git a/src/mock_stream.rs b/src/mock_stream.rs index 5d1743f..8602779 100644 --- a/src/mock_stream.rs +++ b/src/mock_stream.rs @@ -1,7 +1,7 @@ use std::cmp::min; use std::pin::Pin; -use async_std::io::{Error, ErrorKind, Read, Result, Write}; +use futures::io::{AsyncRead as Read, AsyncWrite as Write, Error, ErrorKind, Result}; use futures::task::{Context, Poll}; #[derive(Default, Clone, Debug, Eq, PartialEq, Hash)] diff --git a/src/parse.rs b/src/parse.rs index 79a6170..b999127 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -1,9 +1,9 @@ use std::collections::HashSet; use async_std::channel; -use async_std::io; -use async_std::prelude::*; -use async_std::stream::Stream; +use futures::io; +use futures::prelude::*; +use futures::stream::Stream; use imap_proto::{self, MailboxDatum, RequestId, Response}; use crate::error::{Error, Result}; @@ -41,7 +41,10 @@ pub(crate) fn parse_names> + Unpin + S ) } -fn filter(res: &io::Result, command_tag: &RequestId) -> impl Future { +pub(crate) fn filter( + res: &io::Result, + command_tag: &RequestId, +) -> impl Future { let val = filter_sync(res, command_tag); futures::future::ready(val) } @@ -121,7 +124,7 @@ pub(crate) async fn parse_capabilities let mut caps: HashSet = HashSet::new(); while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -147,7 +150,7 @@ pub(crate) async fn parse_noop> + Unpi command_tag: RequestId, ) -> Result<()> { while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -294,7 +297,7 @@ pub(crate) async fn parse_ids> + Unpin let mut ids: HashSet = HashSet::new(); while let Some(resp) = stream - .take_while(|res| filter_sync(res, &command_tag)) + .take_while(|res| filter(res, &command_tag)) .next() .await { @@ -439,7 +442,7 @@ mod tests { let id = RequestId("A0001".into()); let names: Vec<_> = parse_names(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert!(recv.is_empty()); @@ -460,7 +463,7 @@ mod tests { let id = RequestId("a".into()); let fetches = parse_fetches(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert!(recv.is_empty()); @@ -478,7 +481,7 @@ mod tests { let id = RequestId("a".into()); let fetches = parse_fetches(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert!(recv.is_empty()); @@ -505,7 +508,7 @@ mod tests { let id = RequestId("a".into()); let fetches = parse_fetches(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); assert_eq!(recv.recv().await.unwrap(), UnsolicitedResponse::Recent(1)); @@ -526,7 +529,7 @@ mod tests { let id = RequestId("A0001".into()); let names = parse_names(&mut stream, send, id) - .collect::>>() + .try_collect::>() .await .unwrap(); From 3a7d108ded8276f44921d22ec28efc955d3fb2e3 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 22 Jun 2022 23:24:28 +0200 Subject: [PATCH 4/5] full tokio support --- Cargo.toml | 38 ++-- examples/Cargo.toml | 25 +++ examples/gmail_oauth2.rs | 63 ------ examples/{ => src/bin}/basic.rs | 32 +-- examples/{ => src/bin}/futures.rs | 7 +- examples/src/bin/gmail_oauth2.rs | 62 ++++++ examples/{ => src/bin}/idle.rs | 39 ++-- examples/src/bin/integration.rs | 301 ++++++++++++++++++++++++++++ examples/tokio.rs | 61 ------ src/client.rs | 119 +++++++---- src/extensions/idle.rs | 23 ++- src/extensions/quota.rs | 2 +- src/imap_stream.rs | 27 ++- src/lib.rs | 5 + src/mock_stream.rs | 58 +++++- src/parse.rs | 68 ++++--- tests/imap_integration.rs | 322 ------------------------------ 17 files changed, 660 insertions(+), 592 deletions(-) create mode 100644 examples/Cargo.toml delete mode 100644 examples/gmail_oauth2.rs rename examples/{ => src/bin}/basic.rs (72%) rename examples/{ => src/bin}/futures.rs (95%) create mode 100644 examples/src/bin/gmail_oauth2.rs rename examples/{ => src/bin}/idle.rs (78%) create mode 100644 examples/src/bin/integration.rs delete mode 100644 examples/tokio.rs delete mode 100644 tests/imap_integration.rs diff --git a/Cargo.toml b/Cargo.toml index b8a5440..f22fe64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,15 +19,16 @@ is-it-maintained-issue-resolution = { repository = "async-email/async-imap" } is-it-maintained-open-issues = { repository = "async-email/async-imap" } [features] -default = [] +default = ["runtime-async-std"] + +runtime-async-std = ["async-std", "async-native-tls/runtime-async-std"] +runtime-tokio = ["tokio", "async-native-tls/runtime-tokio"] [dependencies] imap-proto = "0.16.1" nom = "7.0" base64 = "0.13" chrono = "0.4" -async-native-tls = { version = "0.3" } -async-std = { version = "1.8.0", default-features = false, features = ["std"] } pin-utils = "0.1.0-alpha.4" futures = "0.3.15" ouroboros = "0.15" @@ -36,30 +37,15 @@ byte-pool = "0.2.2" once_cell = "1.8.0" log = "0.4.8" thiserror = "1.0.9" +async-channel = "1.6.1" -[dev-dependencies] -lettre_email = "0.9" -pretty_assertions = "1.2" -async-smtp = { version = "0.4" } -async-std = { version = "1.8.0", default-features = false, features = ["std", "attributes"] } -tokio = { version = "1", features = ["rt-multi-thread"] } - -[[example]] -name = "basic" -required-features = ["default"] +async-native-tls = { version = "0.4", default-features = false } +async-std = { version = "1.8.0", default-features = false, features = ["std"], optional = true } +tokio = { version = "1", features = ["net", "sync", "time"], optional = true } -[[example]] -name = "gmail_oauth2" -required-features = ["default"] -[[example]] -name = "futures" -required-features = ["default"] - -[[example]] -name = "tokio" -required-features = ["default"] +[dev-dependencies] +pretty_assertions = "1.2" +async-std = { version = "1.8.0", features = ["std", "attributes"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } -[[test]] -name = "imap_integration" -required-features = ["default"] diff --git a/examples/Cargo.toml b/examples/Cargo.toml new file mode 100644 index 0000000..32038e4 --- /dev/null +++ b/examples/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "async-imap-examples" +version = "0.1.0" +publish = false +authors = ["dignifiedquire "] +license = "Apache-2.0/MIT" +edition = "2018" + +[features] +default = ["runtime-async-std"] + +runtime-async-std = ["async-std", "async-native-tls/runtime-async-std", "async-smtp/runtime-async-std", "async-imap/runtime-async-std"] +runtime-tokio = ["tokio", "async-native-tls/runtime-tokio", "async-smtp/runtime-tokio", "async-imap/runtime-tokio"] + +[dependencies] +async-imap = { path = "../", default-features = false } +async-native-tls = { version = "0.4", default-features = false } +async-smtp = { version = "0.4", default-features = false, features = ["smtp-transport"] } + +async-std = { version = "1.8.0", features = ["std", "attributes"], optional = true } +futures = "0.3.21" +tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } + +[patch.crates-io] +async-smtp = { git = "https://github.com/async-email/async-smtp", branch = "tokio" } diff --git a/examples/gmail_oauth2.rs b/examples/gmail_oauth2.rs deleted file mode 100644 index f5c382e..0000000 --- a/examples/gmail_oauth2.rs +++ /dev/null @@ -1,63 +0,0 @@ -use async_imap::error::Result; -use async_std::prelude::*; -use async_std::task; - -struct GmailOAuth2 { - user: String, - access_token: String, -} - -impl async_imap::Authenticator for &GmailOAuth2 { - type Response = String; - - fn process(&mut self, _data: &[u8]) -> Self::Response { - format!( - "user={}\x01auth=Bearer {}\x01\x01", - self.user, self.access_token - ) - } -} - -fn main() -> Result<()> { - task::block_on(async move { - let gmail_auth = GmailOAuth2 { - user: String::from("sombody@gmail.com"), - access_token: String::from(""), - }; - let domain = "imap.gmail.com"; - let port = 993; - let socket_addr = (domain, port); - let tls = async_native_tls::TlsConnector::new(); - let client = async_imap::connect(socket_addr, domain, tls).await?; - - let mut imap_session = match client.authenticate("XOAUTH2", &gmail_auth).await { - Ok(c) => c, - Err((e, _unauth_client)) => { - println!("error authenticating: {}", e); - return Err(e); - } - }; - - match imap_session.select("INBOX").await { - Ok(mailbox) => println!("{}", mailbox), - Err(e) => println!("Error selecting INBOX: {}", e), - }; - - { - let mut msgs = imap_session.fetch("2", "body[text]").await.map_err(|e| { - eprintln!("Error Fetching email 2: {}", e); - e - })?; - - // TODO: get rid of this - let mut msgs = unsafe { std::pin::Pin::new_unchecked(&mut msgs) }; - - while let Some(msg) = msgs.next().await { - print!("{:?}", msg?); - } - } - - imap_session.logout().await?; - Ok(()) - }) -} diff --git a/examples/basic.rs b/examples/src/bin/basic.rs similarity index 72% rename from examples/basic.rs rename to examples/src/bin/basic.rs index 333ceab..25ce577 100644 --- a/examples/basic.rs +++ b/examples/src/bin/basic.rs @@ -1,20 +1,20 @@ -use async_imap::error::{Error, Result}; -use async_std::prelude::*; -use async_std::task; use std::env; -fn main() -> Result<()> { - task::block_on(async { - let args: Vec = env::args().collect(); - if args.len() != 4 { - eprintln!("need three arguments: imap-server login password"); - Err(Error::Bad("need three arguments".into())) - } else { - let res = fetch_inbox_top(&args[1], &args[2], &args[3]).await?; - println!("**result:\n{}", res.unwrap()); - Ok(()) - } - }) +use async_imap::error::{Error, Result}; +use futures::TryStreamExt; + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() -> Result<()> { + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("need three arguments: imap-server login password"); + Err(Error::Bad("need three arguments".into())) + } else { + let res = fetch_inbox_top(&args[1], &args[2], &args[3]).await?; + println!("**result:\n{}", res.unwrap()); + Ok(()) + } } async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Result> { @@ -38,7 +38,7 @@ async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Resu // fetch message number 1 in this mailbox, along with its RFC822 field. // RFC 822 dictates the format of the body of e-mails let messages_stream = imap_session.fetch("1", "RFC822").await?; - let messages: Vec<_> = messages_stream.collect::>().await?; + let messages: Vec<_> = messages_stream.try_collect().await?; let message = if let Some(m) = messages.first() { m } else { diff --git a/examples/futures.rs b/examples/src/bin/futures.rs similarity index 95% rename from examples/futures.rs rename to examples/src/bin/futures.rs index 38c7937..5a898e1 100644 --- a/examples/futures.rs +++ b/examples/src/bin/futures.rs @@ -1,7 +1,8 @@ -use async_imap::error::{Error, Result}; -use async_std::prelude::*; use std::env; +use async_imap::error::{Error, Result}; +use futures::TryStreamExt; + fn main() -> Result<()> { futures::executor::block_on(async { let args: Vec = env::args().collect(); @@ -36,7 +37,7 @@ async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Resu // fetch message number 1 in this mailbox, along with its RFC822 field. // RFC 822 dictates the format of the body of e-mails let messages_stream = imap_session.fetch("1", "RFC822").await?; - let messages: Vec<_> = messages_stream.collect::>().await?; + let messages: Vec<_> = messages_stream.try_collect().await?; let message = if let Some(m) = messages.first() { m } else { diff --git a/examples/src/bin/gmail_oauth2.rs b/examples/src/bin/gmail_oauth2.rs new file mode 100644 index 0000000..2c0756b --- /dev/null +++ b/examples/src/bin/gmail_oauth2.rs @@ -0,0 +1,62 @@ +use async_imap::error::Result; +use futures::StreamExt; + +struct GmailOAuth2 { + user: String, + access_token: String, +} + +impl async_imap::Authenticator for &GmailOAuth2 { + type Response = String; + + fn process(&mut self, _data: &[u8]) -> Self::Response { + format!( + "user={}\x01auth=Bearer {}\x01\x01", + self.user, self.access_token + ) + } +} + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() -> Result<()> { + let gmail_auth = GmailOAuth2 { + user: String::from("sombody@gmail.com"), + access_token: String::from(""), + }; + let domain = "imap.gmail.com"; + let port = 993; + let socket_addr = (domain, port); + let tls = async_native_tls::TlsConnector::new(); + let client = async_imap::connect(socket_addr, domain, tls).await?; + + let mut imap_session = match client.authenticate("XOAUTH2", &gmail_auth).await { + Ok(c) => c, + Err((e, _unauth_client)) => { + println!("error authenticating: {}", e); + return Err(e); + } + }; + + match imap_session.select("INBOX").await { + Ok(mailbox) => println!("{}", mailbox), + Err(e) => println!("Error selecting INBOX: {}", e), + }; + + { + let mut msgs = imap_session.fetch("2", "body[text]").await.map_err(|e| { + eprintln!("Error Fetching email 2: {}", e); + e + })?; + + // TODO: get rid of this + let mut msgs = unsafe { std::pin::Pin::new_unchecked(&mut msgs) }; + + while let Some(msg) = msgs.next().await { + print!("{:?}", msg?); + } + } + + imap_session.logout().await?; + Ok(()) +} diff --git a/examples/idle.rs b/examples/src/bin/idle.rs similarity index 78% rename from examples/idle.rs rename to examples/src/bin/idle.rs index 4aa83f9..60af895 100644 --- a/examples/idle.rs +++ b/examples/src/bin/idle.rs @@ -1,22 +1,27 @@ -use async_imap::error::{Error, Result}; -use async_std::prelude::*; -// use async_std::io; -use async_imap::extensions::idle::IdleResponse::*; -use async_std::task; use std::env; use std::time::Duration; -fn main() -> Result<()> { - task::block_on(async { - let args: Vec = env::args().collect(); - if args.len() != 4 { - eprintln!("need three arguments: imap-server login password"); - Err(Error::Bad("need three arguments".into())) - } else { - fetch_and_idle(&args[1], &args[2], &args[3]).await?; - Ok(()) - } - }) +use async_imap::error::{Error, Result}; +use async_imap::extensions::idle::IdleResponse::*; +use futures::StreamExt; + +#[cfg(feature = "runtime-async-std")] +use async_std::{task, task::sleep}; + +#[cfg(feature = "runtime-tokio")] +use tokio::{task, time::sleep}; + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() -> Result<()> { + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("need three arguments: imap-server login password"); + Err(Error::Bad("need three arguments".into())) + } else { + fetch_and_idle(&args[1], &args[2], &args[3]).await?; + Ok(()) + } } async fn fetch_and_idle(imap_server: &str, login: &str, password: &str) -> Result<()> { @@ -59,7 +64,7 @@ async fn fetch_and_idle(imap_server: &str, login: &str, password: &str) -> Resul task::spawn(async move { println!("-- thread: waiting for 30s"); - task::sleep(Duration::from_secs(30)).await; + sleep(Duration::from_secs(30)).await; println!("-- thread: waited 30 secs, now interrupting idle"); drop(interrupt); }); diff --git a/examples/src/bin/integration.rs b/examples/src/bin/integration.rs new file mode 100644 index 0000000..a6882f7 --- /dev/null +++ b/examples/src/bin/integration.rs @@ -0,0 +1,301 @@ +use std::borrow::Cow; +use std::time::Duration; + +use async_imap::Session; +use async_native_tls::TlsConnector; +use async_smtp::ServerAddress; +#[cfg(feature = "runtime-async-std")] +use async_std::{net::TcpStream, task, task::sleep}; +use futures::{StreamExt, TryStreamExt}; +#[cfg(feature = "runtime-tokio")] +use tokio::{net::TcpStream, task, time::sleep}; + +fn native_tls() -> async_native_tls::TlsConnector { + async_native_tls::TlsConnector::new() + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true) +} + +fn tls() -> TlsConnector { + TlsConnector::new() + .danger_accept_invalid_hostnames(true) + .danger_accept_invalid_certs(true) +} + +fn test_host() -> String { + std::env::var("TEST_HOST").unwrap_or_else(|_| "127.0.0.1".into()) +} + +async fn session(user: &str) -> Session> { + async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) + .await + .unwrap() + .login(user, user) + .await + .ok() + .unwrap() +} + +async fn smtp(user: &str) -> async_smtp::SmtpTransport { + let creds = + async_smtp::smtp::authentication::Credentials::new(user.to_string(), user.to_string()); + async_smtp::SmtpClient::with_security( + ServerAddress::new(test_host(), 3465), + async_smtp::ClientSecurity::Wrapper(async_smtp::ClientTlsParameters { + connector: native_tls(), + domain: "localhost".to_string(), + }), + ) + .credentials(creds) + .into_transport() +} + +async fn _connect_insecure_then_secure() { + let stream = TcpStream::connect((test_host().as_ref(), 3143)) + .await + .unwrap(); + + // ignored because of https://github.com/greenmail-mail-test/greenmail/issues/135 + async_imap::Client::new(stream) + .secure("imap.example.com", tls()) + .await + .unwrap(); +} + +async fn _connect_secure() { + async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) + .await + .unwrap(); +} + +async fn login() { + session("readonly-test@localhost").await; +} + +async fn logout() { + let mut s = session("readonly-test@localhost").await; + s.logout().await.unwrap(); +} + +async fn inbox_zero() { + // https://github.com/greenmail-mail-test/greenmail/issues/265 + let mut s = session("readonly-test@localhost").await; + s.select("INBOX").await.unwrap(); + let inbox = s.search("ALL").await.unwrap(); + assert_eq!(inbox.len(), 0); +} + +fn make_email(to: &str) -> async_smtp::SendableEmail { + let message_id = "abc"; + async_smtp::SendableEmail::new( + async_smtp::Envelope::new( + Some("sender@localhost".parse().unwrap()), + vec![to.parse().unwrap()], + ) + .unwrap(), + message_id, + format!("To: <{}>\r\nFrom: \r\nMessage-ID: <{}.msg@localhost>\r\nSubject: My first e-mail\r\n\r\nHello world from SMTP", to, message_id), + ) +} + +async fn inbox() { + let to = "inbox@localhost"; + + // first log in so we'll see the unsolicited e-mails + let mut c = session(to).await; + c.select("INBOX").await.unwrap(); + + println!("sending"); + let mut s = smtp(to).await; + + // then send the e-mail + let mail = make_email(to); + s.connect_and_send(mail).await.unwrap(); + + println!("searching"); + + // now we should see the e-mail! + let inbox = c.search("ALL").await.unwrap(); + // and the one message should have the first message sequence number + assert_eq!(inbox.len(), 1); + assert!(inbox.contains(&1)); + + // we should also get two unsolicited responses: Exists and Recent + c.noop().await.unwrap(); + println!("noop done"); + let mut unsolicited = Vec::new(); + while !c.unsolicited_responses.is_empty() { + unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); + } + + assert_eq!(unsolicited.len(), 2); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); + + println!("fetching"); + + // let's see that we can also fetch the e-mail + let fetch: Vec<_> = c + .fetch("1", "(ALL UID)") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert_eq!(fetch.len(), 1); + let fetch = &fetch[0]; + assert_eq!(fetch.message, 1); + assert_ne!(fetch.uid, None); + assert_eq!(fetch.size, Some(21)); + let e = fetch.envelope().unwrap(); + assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); + assert_ne!(e.from, None); + assert_eq!(e.from.as_ref().unwrap().len(), 1); + let from = &e.from.as_ref().unwrap()[0]; + assert_eq!(from.mailbox, Some(Cow::Borrowed(&b"sender"[..]))); + assert_eq!(from.host, Some(Cow::Borrowed(&b"localhost"[..]))); + assert_ne!(e.to, None); + assert_eq!(e.to.as_ref().unwrap().len(), 1); + let to = &e.to.as_ref().unwrap()[0]; + assert_eq!(to.mailbox, Some(Cow::Borrowed(&b"inbox"[..]))); + assert_eq!(to.host, Some(Cow::Borrowed(&b"localhost"[..]))); + let date_opt = fetch.internal_date(); + assert!(date_opt.is_some()); + + // and let's delete it to clean up + c.store("1", "+FLAGS (\\Deleted)") + .await + .unwrap() + .collect::>() + .await; + c.expunge().await.unwrap().collect::>().await; + + // the e-mail should be gone now + let inbox = c.search("ALL").await.unwrap(); + assert_eq!(inbox.len(), 0); +} + +async fn inbox_uid() { + let to = "inbox-uid@localhost"; + + // first log in so we'll see the unsolicited e-mails + let mut c = session(to).await; + c.select("INBOX").await.unwrap(); + + // then send the e-mail + let mut s = smtp(to).await; + let e = make_email(to); + s.connect_and_send(e).await.unwrap(); + + // now we should see the e-mail! + let inbox = c.uid_search("ALL").await.unwrap(); + // and the one message should have the first message sequence number + assert_eq!(inbox.len(), 1); + let uid = inbox.into_iter().next().unwrap(); + + // we should also get two unsolicited responses: Exists and Recent + c.noop().await.unwrap(); + let mut unsolicited = Vec::new(); + while !c.unsolicited_responses.is_empty() { + unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); + } + + assert_eq!(unsolicited.len(), 2); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); + assert!(unsolicited + .iter() + .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); + + // let's see that we can also fetch the e-mail + let fetch: Vec<_> = c + .uid_fetch(format!("{}", uid), "(ALL UID)") + .await + .unwrap() + .try_collect() + .await + .unwrap(); + assert_eq!(fetch.len(), 1); + let fetch = &fetch[0]; + assert_eq!(fetch.uid, Some(uid)); + let e = fetch.envelope().unwrap(); + assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); + let date_opt = fetch.internal_date(); + assert!(date_opt.is_some()); + + // and let's delete it to clean up + c.uid_store(format!("{}", uid), "+FLAGS (\\Deleted)") + .await + .unwrap() + .collect::>() + .await; + c.expunge().await.unwrap().collect::>().await; + + // the e-mail should be gone now + let inbox = c.search("ALL").await.unwrap(); + assert_eq!(inbox.len(), 0); +} + +async fn _list() { + let mut s = session("readonly-test@localhost").await; + s.select("INBOX").await.unwrap(); + let subdirs: Vec<_> = s.list(None, Some("%")).await.unwrap().collect().await; + assert_eq!(subdirs.len(), 0); + + // TODO: make a subdir +} + +// Greenmail does not support IDLE :( +async fn _idle() -> async_imap::error::Result<()> { + let mut session = session("idle-test@localhost").await; + + // get that inbox + let res = session.select("INBOX").await?; + println!("selected: {:#?}", res); + + // fetchy fetch + let msg_stream = session.fetch("1:3", "(FLAGS BODY.PEEK[])").await?; + let msgs = msg_stream.collect::>().await; + println!("msgs: {:?}", msgs.len()); + + // Idle session + println!("starting idle"); + let mut idle = session.idle(); + idle.init().await?; + + let (idle_wait, interrupt) = idle.wait_with_timeout(std::time::Duration::from_secs(30)); + println!("idle wait"); + + task::spawn(async move { + println!("waiting for 1s"); + sleep(Duration::from_secs(2)).await; + println!("interrupting idle"); + drop(interrupt); + }); + + let idle_result = idle_wait.await; + println!("idle result: {:#?}", &idle_result); + + // return the session after we are done with it + let mut session = idle.done().await?; + + println!("logging out"); + session.logout().await?; + + Ok(()) +} + +#[cfg_attr(feature = "runtime-tokio", tokio::main)] +#[cfg_attr(feature = "runtime-async-std", async_std::main)] +async fn main() { + login().await; + logout().await; + inbox_zero().await; + inbox().await; + inbox_uid().await; +} diff --git a/examples/tokio.rs b/examples/tokio.rs deleted file mode 100644 index 86131cc..0000000 --- a/examples/tokio.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::env; - -use async_imap::error::{Error, Result}; -use futures::TryStreamExt; -use tokio::runtime::Runtime; - -fn main() -> Result<()> { - let rt = Runtime::new().expect("unable to create runtime"); - - rt.block_on(async { - let args: Vec = env::args().collect(); - if args.len() != 4 { - eprintln!("need three arguments: imap-server login password"); - Err(Error::Bad("need three arguments".into())) - } else { - let res = fetch_inbox_top(&args[1], &args[2], &args[3]).await?; - println!("**result:\n{}", res.unwrap()); - Ok(()) - } - }) -} - -async fn fetch_inbox_top(imap_server: &str, login: &str, password: &str) -> Result> { - let tls = async_native_tls::TlsConnector::new(); - - // we pass in the imap_server twice to check that the server's TLS - // certificate is valid for the imap_server we're connecting to. - let client = async_imap::connect((imap_server, 993), imap_server, tls).await?; - println!("-- connected to {}:{}", imap_server, 993); - - // the client we have here is unauthenticated. - // to do anything useful with the e-mails, we need to log in - let mut imap_session = client.login(login, password).await.map_err(|e| e.0)?; - println!("-- logged in a {}", login); - - // we want to fetch the first email in the INBOX mailbox - imap_session.select("INBOX").await?; - println!("-- INBOX selected"); - - // fetch message number 1 in this mailbox, along with its RFC822 field. - // RFC 822 dictates the format of the body of e-mails - let messages_stream = imap_session.fetch("1", "RFC822").await?; - let messages: Vec<_> = messages_stream.try_collect().await?; - let message = if let Some(m) = messages.first() { - m - } else { - return Ok(None); - }; - - // extract the message's body - let body = message.body().expect("message did not have a body!"); - let body = std::str::from_utf8(body) - .expect("message was not valid utf-8") - .to_string(); - println!("-- 1 message received, logging out"); - - // be nice to the server and log out - imap_session.logout().await?; - - Ok(Some(body)) -} diff --git a/src/client.rs b/src/client.rs index 45ade70..cc53087 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,13 +4,21 @@ use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::str; +use async_channel::{self as channel, bounded}; use async_native_tls::{TlsConnector, TlsStream}; -use async_std::channel; -use async_std::net::{TcpStream, ToSocketAddrs}; +#[cfg(feature = "runtime-async-std")] +use async_std::{ + io::{Read, Write, WriteExt}, + net::{TcpStream, ToSocketAddrs}, +}; use extensions::quota::parse_get_quota_root; -use futures::io::{self, AsyncRead as Read, AsyncWrite as Write}; -use futures::prelude::*; +use futures::{io, Stream, StreamExt}; use imap_proto::{RequestId, Response}; +#[cfg(feature = "runtime-tokio")] +use tokio::{ + io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt}, + net::{TcpStream, ToSocketAddrs}, +}; use super::authenticator::Authenticator; use super::error::{Error, ParseError, Result, ValidateError}; @@ -359,7 +367,7 @@ impl Session { // not public, just to avoid duplicating the channel creation code fn new(conn: Connection) -> Self { - let (tx, rx) = channel::bounded(100); + let (tx, rx) = bounded(100); Session { conn, unsolicited_responses: rx, @@ -814,12 +822,15 @@ impl Session { /// /// ```no_run /// use async_imap::{types::Seq, Session, error::Result}; - /// use async_std::prelude::*; + /// #[cfg(feature = "runtime-async-std")] /// use async_std::net::TcpStream; + /// #[cfg(feature = "runtime-tokio")] + /// use tokio::net::TcpStream; + /// use futures::TryStreamExt; /// /// async fn delete(seq: Seq, s: &mut Session) -> Result<()> { /// let updates_stream = s.store(format!("{}", seq), "+FLAGS (\\Deleted)").await?; - /// let _updates: Vec<_> = updates_stream.collect::>().await?; + /// let _updates: Vec<_> = updates_stream.try_collect().await?; /// s.expunge().await?; /// Ok(()) /// } @@ -1464,6 +1475,7 @@ mod tests { use super::super::mock_stream::MockStream; use super::*; use std::borrow::Cow; + use std::future::Future; use async_std::sync::{Arc, Mutex}; use imap_proto::Status; @@ -1490,7 +1502,8 @@ mod tests { }; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn fetch_body() { let response = "a0 OK Logged in.\r\n\ * 2 FETCH (BODY[TEXT] {3}\r\nfoo)\r\n\ @@ -1500,7 +1513,8 @@ mod tests { session.read_response().await.unwrap().unwrap(); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn readline_delay_read() { let greeting = "* OK Dovecot ready.\r\n"; let mock_stream = MockStream::default() @@ -1519,7 +1533,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn readline_eof() { let mock_stream = MockStream::default().with_eof(); let mut client = mock_client!(mock_stream); @@ -1527,7 +1542,8 @@ mod tests { assert!(res.is_none()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[should_panic] async fn readline_err() { // TODO Check the error test @@ -1536,7 +1552,8 @@ mod tests { client.read_response().await.unwrap().unwrap(); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn authenticate() { let response = b"+ YmFy\r\n\ A0001 OK Logged in\r\n" @@ -1567,7 +1584,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn login() { let response = b"A0001 OK Logged in\r\n".to_vec(); let username = "username"; @@ -1586,7 +1604,8 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn logout() { let response = b"A0001 OK Logout completed.\r\n".to_vec(); let command = "A0001 LOGOUT\r\n"; @@ -1599,7 +1618,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn rename() { let response = b"A0001 OK RENAME completed\r\n".to_vec(); let current_mailbox_name = "INBOX"; @@ -1621,7 +1641,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn subscribe() { let response = b"A0001 OK SUBSCRIBE completed\r\n".to_vec(); let mailbox = "INBOX"; @@ -1635,7 +1656,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn unsubscribe() { let response = b"A0001 OK UNSUBSCRIBE completed\r\n".to_vec(); let mailbox = "INBOX"; @@ -1649,7 +1671,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn expunge() { let response = b"A0001 OK EXPUNGE completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1661,7 +1684,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_expunge() { let response = b"* 2 EXPUNGE\r\n\ * 3 EXPUNGE\r\n\ @@ -1682,7 +1706,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn check() { let response = b"A0001 OK CHECK completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1694,7 +1719,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn examine() { let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\ * OK [PERMANENTFLAGS ()] Read-only mailbox.\r\n\ @@ -1733,7 +1759,8 @@ mod tests { assert_eq!(mailbox, expected_mailbox); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn select() { let response = b"* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\ * OK [PERMANENTFLAGS (\\* \\Answered \\Flagged \\Deleted \\Draft \\Seen)] \ @@ -1781,7 +1808,8 @@ mod tests { assert_eq!(mailbox, expected_mailbox); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn search() { let response = b"* SEARCH 1 2 3 4 5\r\n\ A0001 OK Search completed\r\n" @@ -1797,7 +1825,8 @@ mod tests { assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_search() { let response = b"* SEARCH 1 2 3 4 5\r\n\ A0001 OK Search completed\r\n" @@ -1813,7 +1842,8 @@ mod tests { assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_search_unordered() { let response = b"* SEARCH 1 2 3 4 5\r\n\ A0002 OK CAPABILITY completed\r\n\ @@ -1830,7 +1860,8 @@ mod tests { assert_eq!(ids, [1, 2, 3, 4, 5].iter().cloned().collect()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn capability() { let response = b"* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n\ A0001 OK CAPABILITY completed\r\n" @@ -1849,7 +1880,8 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn create() { let response = b"A0001 OK CREATE completed\r\n".to_vec(); let mailbox_name = "INBOX"; @@ -1863,7 +1895,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn delete() { let response = b"A0001 OK DELETE completed\r\n".to_vec(); let mailbox_name = "INBOX"; @@ -1877,7 +1910,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn noop() { let response = b"A0001 OK NOOP completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1889,7 +1923,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn close() { let response = b"A0001 OK CLOSE completed\r\n".to_vec(); let mock_stream = MockStream::new(response); @@ -1901,7 +1936,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn store() { generic_store(" ", |c, set, query| async move { c.lock() @@ -1915,7 +1951,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_store() { generic_store(" UID ", |c, set, query| async move { c.lock() @@ -1942,7 +1979,8 @@ mod tests { generic_with_uid(res, "STORE", "2.4", "+FLAGS (\\Deleted)", prefix, op).await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn copy() { generic_copy(" ", |c, set, query| async move { c.lock().await.copy(set, query).await?; @@ -1951,7 +1989,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_copy() { generic_copy(" UID ", |c, set, query| async move { c.lock().await.uid_copy(set, query).await?; @@ -1976,7 +2015,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn mv() { let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\ * 2 EXPUNGE\r\n\ @@ -1994,7 +2034,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_mv() { let response = b"* OK [COPYUID 1511554416 142,399 41:42] Moved UIDs.\r\n\ * 2 EXPUNGE\r\n\ @@ -2012,7 +2053,8 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn fetch() { generic_fetch(" ", |c, seq, query| async move { c.lock() @@ -2027,7 +2069,8 @@ mod tests { .await; } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn uid_fetch() { generic_fetch(" UID ", |c, seq, query| async move { c.lock() diff --git a/src/extensions/idle.rs b/src/extensions/idle.rs index 70f1b88..05a71d7 100644 --- a/src/extensions/idle.rs +++ b/src/extensions/idle.rs @@ -4,11 +4,20 @@ use std::fmt; use std::pin::Pin; use std::time::Duration; -use futures::io::{self, AsyncRead as Read, AsyncWrite as Write}; +#[cfg(feature = "runtime-async-std")] +use async_std::{ + future::timeout, + io::{Read, Write}, +}; use futures::prelude::*; use futures::task::{Context, Poll}; use imap_proto::{RequestId, Response, Status}; use stop_token::prelude::*; +#[cfg(feature = "runtime-tokio")] +use tokio::{ + io::{AsyncRead as Read, AsyncWrite as Write}, + time::timeout, +}; use crate::client::Session; use crate::error::Result; @@ -39,7 +48,7 @@ pub struct Handle { impl Unpin for Handle {} impl Stream for Handle { - type Item = io::Result; + type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().session().get_stream().poll_next(cx) @@ -140,7 +149,7 @@ impl Handle { /// Must be called after [Handle::init]. pub fn wait_with_timeout( &mut self, - timeout: Duration, + dur: Duration, ) -> ( impl Future> + '_, stop_token::StopSource, @@ -152,7 +161,7 @@ impl Handle { let (waiter, interrupt) = self.wait(); let fut = async move { - match async_std::future::timeout(timeout, waiter).await { + match timeout(dur, waiter).await { Ok(res) => res, Err(_err) => Ok(IdleResponse::Timeout), } @@ -179,8 +188,8 @@ impl Handle { } => { if tag == self.id.as_ref().unwrap() { if let Status::Bad = status { - return Err(io::Error::new( - io::ErrorKind::ConnectionRefused, + return Err(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, information.as_ref().unwrap().to_string(), ) .into()); @@ -194,7 +203,7 @@ impl Handle { } } - Err(io::Error::new(io::ErrorKind::ConnectionRefused, "").into()) + Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "").into()) } /// Signal that we want to exit the idle connection, by sending the `DONE` diff --git a/src/extensions/quota.rs b/src/extensions/quota.rs index 0b2676e..64469af 100644 --- a/src/extensions/quota.rs +++ b/src/extensions/quota.rs @@ -1,6 +1,6 @@ //! Adds support for the GETQUOTA and GETQUOTAROOT commands specificed in [RFC2087](https://tools.ietf.org/html/rfc2087). -use async_std::channel; +use async_channel as channel; use futures::io; use futures::prelude::*; use imap_proto::{self, RequestId, Response}; diff --git a/src/imap_stream.rs b/src/imap_stream.rs index 5480530..b97e005 100644 --- a/src/imap_stream.rs +++ b/src/imap_stream.rs @@ -2,13 +2,16 @@ use std::fmt; use std::pin::Pin; use std::sync::Arc; +#[cfg(feature = "runtime-async-std")] +use async_std::io::{Read, Write, WriteExt}; use byte_pool::{Block, BytePool}; -use futures::io::{self, AsyncRead as Read, AsyncWrite as Write}; use futures::stream::Stream; use futures::task::{Context, Poll}; -use futures::AsyncWriteExt; +use futures::{io, ready}; use nom::Needed; use once_cell::sync::Lazy; +#[cfg(feature = "runtime-tokio")] +use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt}; use crate::types::{Request, ResponseData}; @@ -287,13 +290,19 @@ impl Stream for ImapStream { } loop { this.buffer.ensure_capacity(this.decode_needs)?; - let num_bytes_read = - match Pin::new(&mut this.inner).poll_read(cx, this.buffer.free_as_mut_slice()) { - Poll::Ready(result) => result?, - Poll::Pending => { - return Poll::Pending; - } - }; + let buf = this.buffer.free_as_mut_slice(); + + #[cfg(feature = "runtime-async-std")] + let num_bytes_read = ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; + + #[cfg(feature = "runtime-tokio")] + let num_bytes_read = { + let buf = &mut tokio::io::ReadBuf::new(buf); + let start = buf.filled().len(); + ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; + buf.filled().len() - start + }; + if num_bytes_read == 0 { this.closed = true; return Poll::Ready(this.stream_eof_value()); diff --git a/src/lib.rs b/src/lib.rs index aeb238e..4b51f81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,11 @@ #![warn(missing_docs)] #![deny(rust_2018_idioms, unsafe_code)] +#[cfg(not(any(feature = "runtime-tokio", feature = "runtime-async-std")))] +compile_error!("one of 'runtime-async-std' or 'runtime-tokio' features must be enabled"); + +#[cfg(all(feature = "runtime-tokio", feature = "runtime-async-std"))] +compile_error!("only one of 'runtime-async-std' or 'runtime-tokio' features must be enabled"); #[macro_use] extern crate pin_utils; diff --git a/src/mock_stream.rs b/src/mock_stream.rs index 8602779..bcf12fd 100644 --- a/src/mock_stream.rs +++ b/src/mock_stream.rs @@ -1,8 +1,12 @@ use std::cmp::min; +use std::io::{Error, ErrorKind, Result}; use std::pin::Pin; +use std::task::{Context, Poll}; -use futures::io::{AsyncRead as Read, AsyncWrite as Write, Error, ErrorKind, Result}; -use futures::task::{Context, Poll}; +#[cfg(feature = "runtime-async-std")] +use async_std::io::{Read, Write}; +#[cfg(feature = "runtime-tokio")] +use tokio::io::{AsyncRead as Read, AsyncWrite as Write}; #[derive(Default, Clone, Debug, Eq, PartialEq, Hash)] pub struct MockStream { @@ -40,6 +44,55 @@ impl MockStream { } } +#[cfg(feature = "runtime-tokio")] +impl Read for MockStream { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if self.eof_on_read { + return Poll::Ready(Ok(())); + } + if self.err_on_read { + return Poll::Ready(Err(Error::new(ErrorKind::Other, "MockStream Error"))); + } + if self.read_pos >= self.read_buf.len() { + return Poll::Ready(Err(Error::new(ErrorKind::UnexpectedEof, "EOF"))); + } + let mut write_len = min(buf.remaining(), self.read_buf.len() - self.read_pos); + if self.read_delay > 0 { + self.read_delay -= 1; + write_len = min(write_len, 1); + } + let max_pos = self.read_pos + write_len; + buf.put_slice(&self.read_buf[self.read_pos..max_pos]); + self.read_pos += write_len; + Poll::Ready(Ok(())) + } +} + +#[cfg(feature = "runtime-tokio")] +impl Write for MockStream { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.written_buf.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[cfg(feature = "runtime-async-std")] impl Read for MockStream { fn poll_read( mut self: Pin<&mut Self>, @@ -69,6 +122,7 @@ impl Read for MockStream { } } +#[cfg(feature = "runtime-async-std")] impl Write for MockStream { fn poll_write( mut self: Pin<&mut Self>, diff --git a/src/parse.rs b/src/parse.rs index b999127..7913b92 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use async_std::channel; +use async_channel as channel; use futures::io; use futures::prelude::*; use futures::stream::Stream; @@ -368,6 +368,7 @@ pub(crate) async fn handle_unilateral( #[cfg(test)] mod tests { use super::*; + use async_channel::bounded; fn input_stream(data: &[&str]) -> Vec> { data.iter() @@ -383,14 +384,15 @@ mod tests { .collect() } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_capability_test() { let expected_capabilities = &["IMAP4rev1", "STARTTLS", "AUTH=GSSAPI", "LOGINDISABLED"]; let responses = input_stream(&["* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n"]); let mut stream = async_std::stream::from_iter(responses); - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let id = RequestId("A0001".into()); let capabilities = parse_capabilities(&mut stream, send, id).await.unwrap(); // shouldn't be any unexpected responses parsed @@ -401,14 +403,15 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_capability_case_insensitive_test() { // Test that "IMAP4REV1" (instead of "IMAP4rev1") is accepted let expected_capabilities = &["IMAP4rev1", "STARTTLS"]; let responses = input_stream(&["* CAPABILITY IMAP4REV1 STARTTLS\r\n"]); let mut stream = async_std::stream::from_iter(responses); - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let id = RequestId("A0001".into()); let capabilities = parse_capabilities(&mut stream, send, id).await.unwrap(); @@ -420,10 +423,11 @@ mod tests { } } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] #[should_panic] async fn parse_capability_invalid_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* JUNK IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n"]); let mut stream = async_std::stream::from_iter(responses); @@ -434,9 +438,10 @@ mod tests { assert!(recv.is_empty()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_names_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* LIST (\\HasNoChildren) \".\" \"INBOX\"\r\n"]); let mut stream = async_std::stream::from_iter(responses); @@ -455,9 +460,10 @@ mod tests { assert_eq!(names[0].name(), "INBOX"); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_fetches_empty() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[]); let mut stream = async_std::stream::from_iter(responses); let id = RequestId("a".into()); @@ -470,9 +476,10 @@ mod tests { assert!(fetches.is_empty()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_fetches_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* 24 FETCH (FLAGS (\\Seen) UID 4827943)\r\n", "* 25 FETCH (FLAGS (\\Seen))\r\n", @@ -499,10 +506,11 @@ mod tests { assert_eq!(fetches[1].header(), None); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_fetches_w_unilateral() { // https://github.com/mattnenterprise/rust-imap/issues/81 - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* 37 FETCH (UID 74)\r\n", "* 1 RECENT\r\n"]); let mut stream = async_std::stream::from_iter(responses); let id = RequestId("a".into()); @@ -518,9 +526,10 @@ mod tests { assert_eq!(fetches[0].uid, Some(74)); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_names_w_unilateral() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* LIST (\\HasNoChildren) \".\" \"INBOX\"\r\n", "* 4 EXPUNGE\r\n", @@ -544,9 +553,10 @@ mod tests { assert_eq!(names[0].name(), "INBOX"); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_capabilities_w_unilateral() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* CAPABILITY IMAP4rev1 STARTTLS AUTH=GSSAPI LOGINDISABLED\r\n", "* STATUS dev.github (MESSAGES 10 UIDNEXT 11 UIDVALIDITY 1408806928 UNSEEN 0)\r\n", @@ -579,9 +589,10 @@ mod tests { assert_eq!(recv.recv().await.unwrap(), UnsolicitedResponse::Exists(4)); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_ids_w_unilateral() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* SEARCH 23 42 4711\r\n", "* 1 RECENT\r\n", @@ -609,9 +620,10 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_ids_test() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "* SEARCH 1600 1698 1739 1781 1795 1885 1891 1892 1893 1898 1899 1901 1911 1926 1932 1933 1993 1994 2007 2032 2033 2041 2053 2062 2063 2065 2066 2072 2078 2079 2082 2084 2095 2100 2101 2102 2103 2104 2107 2116 2120 2135 2138 2154 2163 2168 2172 2189 2193 2198 2199 2205 2212 2213 2221 2227 2267 2275 2276 2295 2300 2328 2330 2332 2333 2334\r\n", "* SEARCH 2335 2336 2337 2338 2339 2341 2342 2347 2349 2350 2358 2359 2362 2369 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2390 2392 2397 2400 2401 2403 2405 2409 2411 2414 2417 2419 2420 2424 2426 2428 2439 2454 2456 2467 2468 2469 2490 2515 2519 2520 2521\r\n", @@ -642,9 +654,10 @@ mod tests { ); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_ids_search() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&["* SEARCH\r\n"]); let mut stream = async_std::stream::from_iter(responses); @@ -656,9 +669,10 @@ mod tests { assert_eq!(ids, HashSet::::new()); } - #[async_std::test] + #[cfg_attr(feature = "runtime-tokio", tokio::test)] + #[cfg_attr(feature = "runtime-async-std", async_std::test)] async fn parse_mailbox_does_not_exist_error() { - let (send, recv) = channel::bounded(10); + let (send, recv) = bounded(10); let responses = input_stream(&[ "A0003 NO Mailbox doesn't exist: DeltaChat (0.001 + 0.140 + 0.139 secs).\r\n", ]); diff --git a/tests/imap_integration.rs b/tests/imap_integration.rs deleted file mode 100644 index 30148c5..0000000 --- a/tests/imap_integration.rs +++ /dev/null @@ -1,322 +0,0 @@ -use std::borrow::Cow; -use std::time::Duration; - -use async_imap::Session; -use async_native_tls::TlsConnector; -use async_std::net::TcpStream; -use async_std::prelude::*; -use async_std::task; - -fn native_tls() -> async_native_tls::TlsConnector { - async_native_tls::TlsConnector::new() - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true) -} - -fn tls() -> TlsConnector { - TlsConnector::new() - .danger_accept_invalid_hostnames(true) - .danger_accept_invalid_certs(true) -} - -fn test_host() -> String { - std::env::var("TEST_HOST").unwrap_or_else(|_| "127.0.0.1".into()) -} - -async fn session(user: &str) -> Session> { - async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) - .await - .unwrap() - .login(user, user) - .await - .ok() - .unwrap() -} - -async fn smtp(user: &str) -> async_smtp::SmtpTransport { - let creds = - async_smtp::smtp::authentication::Credentials::new(user.to_string(), user.to_string()); - async_smtp::SmtpClient::with_security( - &format!("{}:3465", test_host()), - async_smtp::ClientSecurity::Wrapper(async_smtp::ClientTlsParameters { - connector: native_tls(), - domain: "localhost".to_string(), - }), - ) - .await - .expect("Failed to connect to smtp server") - .credentials(creds) - .into_transport() -} - -// #[test] -fn _connect_insecure_then_secure() { - task::block_on(async { - let stream = TcpStream::connect((test_host().as_ref(), 3143)) - .await - .unwrap(); - - // ignored because of https://github.com/greenmail-mail-test/greenmail/issues/135 - async_imap::Client::new(stream) - .secure("imap.example.com", tls()) - .await - .unwrap(); - }); -} - -#[test] -#[ignore] -fn connect_secure() { - task::block_on(async { - async_imap::connect(&format!("{}:3993", test_host()), "imap.example.com", tls()) - .await - .unwrap(); - }); -} - -#[test] -#[ignore] -fn login() { - task::block_on(async { - session("readonly-test@localhost").await; - }); -} - -#[test] -#[ignore] -fn logout() { - task::block_on(async { - let mut s = session("readonly-test@localhost").await; - s.logout().await.unwrap(); - }); -} - -#[test] -#[ignore] -fn inbox_zero() { - task::block_on(async { - // https://github.com/greenmail-mail-test/greenmail/issues/265 - let mut s = session("readonly-test@localhost").await; - s.select("INBOX").await.unwrap(); - let inbox = s.search("ALL").await.unwrap(); - assert_eq!(inbox.len(), 0); - }); -} -fn make_email(to: &str) -> async_smtp::SendableEmail { - let message_id = "abc"; - async_smtp::SendableEmail::new( - async_smtp::Envelope::new( - Some("sender@localhost".parse().unwrap()), - vec![to.parse().unwrap()], - ) - .unwrap(), - message_id, - format!("To: <{}>\r\nFrom: \r\nMessage-ID: <{}.msg@localhost>\r\nSubject: My first e-mail\r\n\r\nHello world from SMTP", to, message_id), - ) -} - -#[test] -#[ignore] -fn inbox() { - task::block_on(async { - let to = "inbox@localhost"; - - // first log in so we'll see the unsolicited e-mails - let mut c = session(to).await; - c.select("INBOX").await.unwrap(); - - println!("sending"); - let mut s = smtp(to).await; - - // then send the e-mail - let mail = make_email(to); - s.connect_and_send(mail).await.unwrap(); - - println!("searching"); - - // now we should see the e-mail! - let inbox = c.search("ALL").await.unwrap(); - // and the one message should have the first message sequence number - assert_eq!(inbox.len(), 1); - assert!(inbox.contains(&1)); - - // we should also get two unsolicited responses: Exists and Recent - c.noop().await.unwrap(); - println!("noop done"); - let mut unsolicited = Vec::new(); - while !c.unsolicited_responses.is_empty() { - unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); - } - - assert_eq!(unsolicited.len(), 2); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); - - println!("fetching"); - - // let's see that we can also fetch the e-mail - let fetch: Vec<_> = c - .fetch("1", "(ALL UID)") - .await - .unwrap() - .collect::, _>>() - .await - .unwrap(); - assert_eq!(fetch.len(), 1); - let fetch = &fetch[0]; - assert_eq!(fetch.message, 1); - assert_ne!(fetch.uid, None); - assert_eq!(fetch.size, Some(21)); - let e = fetch.envelope().unwrap(); - assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); - assert_ne!(e.from, None); - assert_eq!(e.from.as_ref().unwrap().len(), 1); - let from = &e.from.as_ref().unwrap()[0]; - assert_eq!(from.mailbox, Some(Cow::Borrowed(&b"sender"[..]))); - assert_eq!(from.host, Some(Cow::Borrowed(&b"localhost"[..]))); - assert_ne!(e.to, None); - assert_eq!(e.to.as_ref().unwrap().len(), 1); - let to = &e.to.as_ref().unwrap()[0]; - assert_eq!(to.mailbox, Some(Cow::Borrowed(&b"inbox"[..]))); - assert_eq!(to.host, Some(Cow::Borrowed(&b"localhost"[..]))); - let date_opt = fetch.internal_date(); - assert!(date_opt.is_some()); - - // and let's delete it to clean up - c.store("1", "+FLAGS (\\Deleted)") - .await - .unwrap() - .collect::>() - .await; - c.expunge().await.unwrap().collect::>().await; - - // the e-mail should be gone now - let inbox = c.search("ALL").await.unwrap(); - assert_eq!(inbox.len(), 0); - }); -} - -#[test] -#[ignore] -fn inbox_uid() { - task::block_on(async { - let to = "inbox-uid@localhost"; - - // first log in so we'll see the unsolicited e-mails - let mut c = session(to).await; - c.select("INBOX").await.unwrap(); - - // then send the e-mail - let mut s = smtp(to).await; - let e = make_email(to); - s.connect_and_send(e).await.unwrap(); - - // now we should see the e-mail! - let inbox = c.uid_search("ALL").await.unwrap(); - // and the one message should have the first message sequence number - assert_eq!(inbox.len(), 1); - let uid = inbox.into_iter().next().unwrap(); - - // we should also get two unsolicited responses: Exists and Recent - c.noop().await.unwrap(); - let mut unsolicited = Vec::new(); - while !c.unsolicited_responses.is_empty() { - unsolicited.push(c.unsolicited_responses.recv().await.unwrap()); - } - - assert_eq!(unsolicited.len(), 2); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Exists(1))); - assert!(unsolicited - .iter() - .any(|m| m == &async_imap::types::UnsolicitedResponse::Recent(1))); - - // let's see that we can also fetch the e-mail - let fetch: Vec<_> = c - .uid_fetch(format!("{}", uid), "(ALL UID)") - .await - .unwrap() - .collect::>() - .await - .unwrap(); - assert_eq!(fetch.len(), 1); - let fetch = &fetch[0]; - assert_eq!(fetch.uid, Some(uid)); - let e = fetch.envelope().unwrap(); - assert_eq!(e.subject, Some(Cow::Borrowed(&b"My first e-mail"[..]))); - let date_opt = fetch.internal_date(); - assert!(date_opt.is_some()); - - // and let's delete it to clean up - c.uid_store(format!("{}", uid), "+FLAGS (\\Deleted)") - .await - .unwrap() - .collect::>() - .await; - c.expunge().await.unwrap().collect::>().await; - - // the e-mail should be gone now - let inbox = c.search("ALL").await.unwrap(); - assert_eq!(inbox.len(), 0); - }); -} - -// #[test] -fn _list() { - task::block_on(async { - let mut s = session("readonly-test@localhost").await; - s.select("INBOX").await.unwrap(); - let subdirs: Vec<_> = s.list(None, Some("%")).await.unwrap().collect().await; - assert_eq!(subdirs.len(), 0); - - // TODO: make a subdir - }); -} - -// Greenmail does not support IDLE :( -// #[test] -fn _idle() -> async_imap::error::Result<()> { - task::block_on(async { - let mut session = session("idle-test@localhost").await; - - // get that inbox - let res = session.select("INBOX").await?; - println!("selected: {:#?}", res); - - // fetchy fetch - let msg_stream = session.fetch("1:3", "(FLAGS BODY.PEEK[])").await?; - let msgs = msg_stream.collect::>().await; - println!("msgs: {:?}", msgs.len()); - - // Idle session - println!("starting idle"); - let mut idle = session.idle(); - idle.init().await?; - - let (idle_wait, interrupt) = idle.wait_with_timeout(std::time::Duration::from_secs(30)); - println!("idle wait"); - - task::spawn(async move { - println!("waiting for 1s"); - task::sleep(Duration::from_secs(2)).await; - println!("interrupting idle"); - drop(interrupt); - }); - - let idle_result = idle_wait.await; - println!("idle result: {:#?}", &idle_result); - - // return the session after we are done with it - let mut session = idle.done().await?; - - println!("logging out"); - session.logout().await?; - - Ok(()) - }) -} From e282e7edc4dd2a2f218dfe2842bf67149c8710e6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 27 Jun 2022 13:02:44 +0200 Subject: [PATCH 5/5] fix: remove unused pin --- examples/src/bin/gmail_oauth2.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/examples/src/bin/gmail_oauth2.rs b/examples/src/bin/gmail_oauth2.rs index 2c0756b..da68bd2 100644 --- a/examples/src/bin/gmail_oauth2.rs +++ b/examples/src/bin/gmail_oauth2.rs @@ -49,9 +49,6 @@ async fn main() -> Result<()> { e })?; - // TODO: get rid of this - let mut msgs = unsafe { std::pin::Pin::new_unchecked(&mut msgs) }; - while let Some(msg) = msgs.next().await { print!("{:?}", msg?); }