From 814f4285faf1a47fbf67d18471322c6e922adf26 Mon Sep 17 00:00:00 2001 From: Anisov Date: Thu, 26 Nov 2020 14:29:45 +0300 Subject: [PATCH 1/9] Add processing aiohttp StreamReader for proxying data from a request. --- gql/transport/aiohttp.py | 3 ++- gql/utils.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 857edfab..1486bf36 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -175,7 +175,8 @@ async def execute( data.add_field("map", file_map_str, content_type="application/json") # Add the extracted files as remaining fields - data.add_fields(*file_streams.items()) + for k, v in file_streams.items(): + data.add_field(k, v, filename=k) post_args: Dict[str, Any] = {"data": data} diff --git a/gql/utils.py b/gql/utils.py index ce0318b0..829113a7 100644 --- a/gql/utils.py +++ b/gql/utils.py @@ -3,6 +3,8 @@ import io from typing import Any, Dict, Tuple +from aiohttp import StreamReader + # From this response in Stackoverflow # http://stackoverflow.com/a/19053800/1072990 @@ -14,8 +16,8 @@ def to_camel_case(snake_str): def is_file_like(value: Any) -> bool: - """Check if a value represents a file like object""" - return isinstance(value, io.IOBase) + """Check if a value represents a file like object or StreamReader""" + return isinstance(value, io.IOBase) or isinstance(value, StreamReader) def extract_files(variables: Dict) -> Tuple[Dict, Dict]: From 4859f67d73e2254af777b9bb59c040ce567eb328 Mon Sep 17 00:00:00 2001 From: Anisov Date: Thu, 26 Nov 2020 15:30:47 +0300 Subject: [PATCH 2/9] Fix tests and refactoring --- gql/transport/aiohttp.py | 4 +++- gql/utils.py | 20 ++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 1486bf36..4f68eedf 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -144,7 +144,9 @@ async def execute( # If we upload files, we will extract the files present in the # variable_values dict and replace them by null values - nulled_variable_values, files = extract_files(variable_values) + nulled_variable_values, files = extract_files( + variable_values, (aiohttp.StreamReader,) + ) # Save the nulled variable values in the payload payload["variables"] = nulled_variable_values diff --git a/gql/utils.py b/gql/utils.py index 829113a7..056ebe1b 100644 --- a/gql/utils.py +++ b/gql/utils.py @@ -1,9 +1,7 @@ """Utilities to manipulate several python objects.""" import io -from typing import Any, Dict, Tuple - -from aiohttp import StreamReader +from typing import Any, Dict, Optional, Tuple # From this response in Stackoverflow @@ -15,12 +13,18 @@ def to_camel_case(snake_str): return components[0] + "".join(x.title() if x else "_" for x in components[1:]) -def is_file_like(value: Any) -> bool: - """Check if a value represents a file like object or StreamReader""" - return isinstance(value, io.IOBase) or isinstance(value, StreamReader) +def is_file_like(value: Any, additional_file_like_instances: Tuple[Any]) -> bool: + """Check if a value represents a file like object""" + if additional_file_like_instances: + return isinstance(value, io.IOBase) or isinstance( + value, additional_file_like_instances + ) + return isinstance(value, io.IOBase) -def extract_files(variables: Dict) -> Tuple[Dict, Dict]: +def extract_files( + variables: Dict, additional_file_like_instances: Optional[Tuple[Any]] +) -> Tuple[Dict, Dict]: files = {} def recurse_extract(path, obj): @@ -42,7 +46,7 @@ def recurse_extract(path, obj): value = recurse_extract(f"{path}.{key}", value) nulled_obj[key] = value return nulled_obj - elif is_file_like(obj): + elif is_file_like(obj, additional_file_like_instances): # extract obj from its parent and put it into files instead. files[path] = obj return None From a433f74cc3ffae0569f6c1beb222cc5f66cde2e0 Mon Sep 17 00:00:00 2001 From: Anisov Date: Thu, 26 Nov 2020 16:03:51 +0300 Subject: [PATCH 3/9] Some fix and add info in upload docs --- docs/usage/file_upload.rst | 31 +++++++++++++++++++++++++++++++ gql/utils.py | 6 ++++-- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/docs/usage/file_upload.rst b/docs/usage/file_upload.rst index d900df95..c39e5afe 100644 --- a/docs/usage/file_upload.rst +++ b/docs/usage/file_upload.rst @@ -67,3 +67,34 @@ It is also possible to upload multiple files using a list. f1.close() f2.close() + + +Aiohttp StreamReader +-------------------- + +In order to upload a aiohttp StreamReader, you need to: + +* get response from aiohttp request and then get StreamReader from `resp.content` +* provide the StreamReader to the `variable_values` argument of `execute` +* set the `upload_files` argument to True + + +.. code-block:: python + + async with ClientSession() as client: + async with client.get('YOUR_URL') as resp: + transport = AIOHTTPTransport(url='YOUR_URL') + client = Client(transport=transport) + query = gql(''' + mutation($file: Upload!) { + singleUpload(file: $file) { + id + } + } + ''') + + params = {"file": resp.content} + + result = client.execute( + query, variable_values=params, upload_files=True + ) \ No newline at end of file diff --git a/gql/utils.py b/gql/utils.py index 056ebe1b..5362efa5 100644 --- a/gql/utils.py +++ b/gql/utils.py @@ -13,7 +13,9 @@ def to_camel_case(snake_str): return components[0] + "".join(x.title() if x else "_" for x in components[1:]) -def is_file_like(value: Any, additional_file_like_instances: Tuple[Any]) -> bool: +def is_file_like( + value: Any, additional_file_like_instances: Optional[Tuple[Any]] = None +) -> bool: """Check if a value represents a file like object""" if additional_file_like_instances: return isinstance(value, io.IOBase) or isinstance( @@ -23,7 +25,7 @@ def is_file_like(value: Any, additional_file_like_instances: Tuple[Any]) -> bool def extract_files( - variables: Dict, additional_file_like_instances: Optional[Tuple[Any]] + variables: Dict, additional_file_like_instances: Optional[Tuple[Any]] = None ) -> Tuple[Dict, Dict]: files = {} From 8985cf32d955460690294b153edac93ce3f28d94 Mon Sep 17 00:00:00 2001 From: Anisov Date: Thu, 26 Nov 2020 16:39:09 +0300 Subject: [PATCH 4/9] Refactoring obj instance check in extract_files --- gql/transport/aiohttp.py | 4 +++- gql/utils.py | 18 +++--------------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 4f68eedf..023835ab 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -1,3 +1,4 @@ +import io import json import logging from ssl import SSLContext @@ -145,7 +146,8 @@ async def execute( # If we upload files, we will extract the files present in the # variable_values dict and replace them by null values nulled_variable_values, files = extract_files( - variable_values, (aiohttp.StreamReader,) + variables=variable_values, + file_classes=(io.IOBase, aiohttp.StreamReader), ) # Save the nulled variable values in the payload diff --git a/gql/utils.py b/gql/utils.py index 5362efa5..3edb086c 100644 --- a/gql/utils.py +++ b/gql/utils.py @@ -1,7 +1,6 @@ """Utilities to manipulate several python objects.""" -import io -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Tuple, Type # From this response in Stackoverflow @@ -13,19 +12,8 @@ def to_camel_case(snake_str): return components[0] + "".join(x.title() if x else "_" for x in components[1:]) -def is_file_like( - value: Any, additional_file_like_instances: Optional[Tuple[Any]] = None -) -> bool: - """Check if a value represents a file like object""" - if additional_file_like_instances: - return isinstance(value, io.IOBase) or isinstance( - value, additional_file_like_instances - ) - return isinstance(value, io.IOBase) - - def extract_files( - variables: Dict, additional_file_like_instances: Optional[Tuple[Any]] = None + variables: Dict, file_classes: Tuple[Type[Any], ...] ) -> Tuple[Dict, Dict]: files = {} @@ -48,7 +36,7 @@ def recurse_extract(path, obj): value = recurse_extract(f"{path}.{key}", value) nulled_obj[key] = value return nulled_obj - elif is_file_like(obj, additional_file_like_instances): + elif isinstance(obj, file_classes): # extract obj from its parent and put it into files instead. files[path] = obj return None From 9e668a856f54a8bee52a30750a77e4ee3763c8de Mon Sep 17 00:00:00 2001 From: Anisov Date: Thu, 26 Nov 2020 17:16:33 +0300 Subject: [PATCH 5/9] Add class param for file classes --- gql/transport/aiohttp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 023835ab..97fcdf58 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -29,6 +29,7 @@ class AIOHTTPTransport(AsyncTransport): This transport use the aiohttp library with asyncio. """ + file_classes = (io.IOBase, aiohttp.StreamReader) def __init__( self, @@ -147,7 +148,7 @@ async def execute( # variable_values dict and replace them by null values nulled_variable_values, files = extract_files( variables=variable_values, - file_classes=(io.IOBase, aiohttp.StreamReader), + file_classes=self.file_classes, ) # Save the nulled variable values in the payload From e53f3895a52be7cde0a141114e299c65fdd7b727 Mon Sep 17 00:00:00 2001 From: Anisov Date: Thu, 26 Nov 2020 17:22:42 +0300 Subject: [PATCH 6/9] Fix lint --- gql/transport/aiohttp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 97fcdf58..28ffd76e 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -29,6 +29,7 @@ class AIOHTTPTransport(AsyncTransport): This transport use the aiohttp library with asyncio. """ + file_classes = (io.IOBase, aiohttp.StreamReader) def __init__( @@ -147,8 +148,7 @@ async def execute( # If we upload files, we will extract the files present in the # variable_values dict and replace them by null values nulled_variable_values, files = extract_files( - variables=variable_values, - file_classes=self.file_classes, + variables=variable_values, file_classes=self.file_classes, ) # Save the nulled variable values in the payload From 3ea5a15cd7414120b301fc1cfa63d7b5eb305424 Mon Sep 17 00:00:00 2001 From: Anisov Date: Sat, 28 Nov 2020 19:49:01 +0300 Subject: [PATCH 7/9] Added async generator for upload, wrote docs, added tests for new features --- docs/usage/file_upload.rst | 39 ++++++++++++++++++- gql/transport/aiohttp.py | 8 +++- setup.py | 1 + tests/test_aiohttp.py | 78 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 3 deletions(-) diff --git a/docs/usage/file_upload.rst b/docs/usage/file_upload.rst index c39e5afe..00198b4b 100644 --- a/docs/usage/file_upload.rst +++ b/docs/usage/file_upload.rst @@ -97,4 +97,41 @@ In order to upload a aiohttp StreamReader, you need to: result = client.execute( query, variable_values=params, upload_files=True - ) \ No newline at end of file + ) + +Asynchronous Generator +---------------------- + +In order to upload a single file use asynchronous generator(https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads), you need to: + +* сreate a asynchronous generator +* set the generator as a variable value in the mutation +* provide the opened file to the `variable_values` argument of `execute` +* set the `upload_files` argument to True + +.. code-block:: python + + transport = AIOHTTPTransport(url='YOUR_URL') + + client = Client(transport=sample_transport) + + query = gql(''' + mutation($file: Upload!) { + singleUpload(file: $file) { + id + } + } + ''') + + async def file_sender(file_name=None): + async with aiofiles.open(file_name, 'rb') as f: + chunk = await f.read(64*1024) + while chunk: + yield chunk + chunk = await f.read(64*1024) + + params = {"file": file_sender(file_name='YOUR_FILE_PATH')} + + result = client.execute( + query, variable_values=params, upload_files=True + ) \ No newline at end of file diff --git a/gql/transport/aiohttp.py b/gql/transport/aiohttp.py index 28ffd76e..b1a33ad2 100644 --- a/gql/transport/aiohttp.py +++ b/gql/transport/aiohttp.py @@ -2,7 +2,7 @@ import json import logging from ssl import SSLContext -from typing import Any, AsyncGenerator, Dict, Optional, Union +from typing import Any, AsyncGenerator, Dict, Optional, Tuple, Type, Union import aiohttp from aiohttp.client_exceptions import ClientResponseError @@ -30,7 +30,11 @@ class AIOHTTPTransport(AsyncTransport): This transport use the aiohttp library with asyncio. """ - file_classes = (io.IOBase, aiohttp.StreamReader) + file_classes: Tuple[Type[Any], ...] = ( + io.IOBase, + aiohttp.StreamReader, + AsyncGenerator, + ) def __init__( self, diff --git a/setup.py b/setup.py index 2014dcf3..fdcaccf0 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ "pytest-cov==2.8.1", "mock==4.0.2", "vcrpy==4.0.2", + "aiofiles", ] dev_requires = [ diff --git a/tests/test_aiohttp.py b/tests/test_aiohttp.py index 66a29dbc..649b0f40 100644 --- a/tests/test_aiohttp.py +++ b/tests/test_aiohttp.py @@ -582,6 +582,84 @@ async def test_aiohttp_binary_file_upload(event_loop, aiohttp_server): assert success +@pytest.mark.asyncio +async def test_aiohttp_stream_response_upload(event_loop, aiohttp_server): + from aiohttp import web, ClientSession + from gql.transport.aiohttp import AIOHTTPTransport + + async def binary_data_handler(request): + return web.Response( + body=binary_file_content, content_type="binary/octet-stream" + ) + + app = web.Application() + app.router.add_route("POST", "/", binary_upload_handler) + app.router.add_route("GET", "/binary_data", binary_data_handler) + + server = await aiohttp_server(app) + + url = server.make_url("/") + binary_data_url = server.make_url("/binary_data") + + sample_transport = AIOHTTPTransport(url=url, timeout=10) + + async with Client(transport=sample_transport) as session: + query = gql(file_upload_mutation_1) + async with ClientSession() as client: + async with client.get(binary_data_url) as resp: + params = {"file": resp.content, "other_var": 42} + + # Execute query asynchronously + result = await session.execute( + query, variable_values=params, upload_files=True + ) + + success = result["success"] + + assert success + + +@pytest.mark.asyncio +async def test_aiohttp_async_generator_upload(event_loop, aiohttp_server): + import aiofiles + from aiohttp import web + from gql.transport.aiohttp import AIOHTTPTransport + + app = web.Application() + app.router.add_route("POST", "/", binary_upload_handler) + server = await aiohttp_server(app) + + url = server.make_url("/") + + sample_transport = AIOHTTPTransport(url=url, timeout=10) + + with TemporaryFile(binary_file_content) as test_file: + + async with Client(transport=sample_transport,) as session: + + query = gql(file_upload_mutation_1) + + file_path = test_file.filename + + async def file_sender(file_name=None): + async with aiofiles.open(file_name, "rb") as f: + chunk = await f.read(64 * 1024) + while chunk: + yield chunk + chunk = await f.read(64 * 1024) + + params = {"file": file_sender(file_path), "other_var": 42} + + # Execute query asynchronously + result = await session.execute( + query, variable_values=params, upload_files=True + ) + + success = result["success"] + + assert success + + file_upload_mutation_2 = """ mutation($file1: Upload!, $file2: Upload!) { uploadFile(input:{file1:$file, file2:$file}) { From 68ce6bb8134bfd401fab04471c637d82a7594ef9 Mon Sep 17 00:00:00 2001 From: Anisov Date: Sat, 28 Nov 2020 21:44:39 +0300 Subject: [PATCH 8/9] Fix test name --- tests/test_aiohttp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_aiohttp.py b/tests/test_aiohttp.py index 649b0f40..a1dec30a 100644 --- a/tests/test_aiohttp.py +++ b/tests/test_aiohttp.py @@ -583,7 +583,7 @@ async def test_aiohttp_binary_file_upload(event_loop, aiohttp_server): @pytest.mark.asyncio -async def test_aiohttp_stream_response_upload(event_loop, aiohttp_server): +async def test_aiohttp_stream_reader_upload(event_loop, aiohttp_server): from aiohttp import web, ClientSession from gql.transport.aiohttp import AIOHTTPTransport From e5a9508af3e81ed2d7aa2a71398752ff46230edc Mon Sep 17 00:00:00 2001 From: Hanusz Leszek Date: Sun, 29 Nov 2020 12:24:14 +0100 Subject: [PATCH 9/9] docs modifications + remove None default of the file_sender generator --- docs/usage/file_upload.rst | 108 +++++++++++++++++++++++++------------ tests/test_aiohttp.py | 2 +- 2 files changed, 75 insertions(+), 35 deletions(-) diff --git a/docs/usage/file_upload.rst b/docs/usage/file_upload.rst index 00198b4b..d5f07c50 100644 --- a/docs/usage/file_upload.rst +++ b/docs/usage/file_upload.rst @@ -19,7 +19,7 @@ In order to upload a single file, you need to: transport = AIOHTTPTransport(url='YOUR_URL') - client = Client(transport=sample_transport) + client = Client(transport=transport) query = gql(''' mutation($file: Upload!) { @@ -46,7 +46,7 @@ It is also possible to upload multiple files using a list. transport = AIOHTTPTransport(url='YOUR_URL') - client = Client(transport=sample_transport) + client = Client(transport=transport) query = gql(''' mutation($files: [Upload!]!) { @@ -69,51 +69,45 @@ It is also possible to upload multiple files using a list. f2.close() -Aiohttp StreamReader --------------------- +Streaming +--------- -In order to upload a aiohttp StreamReader, you need to: +If you use the above methods to send files, then the entire contents of the files +must be loaded in memory before the files are sent. +If the files are not too big and you have enough RAM, it is not a problem. +On another hand if you want to avoid using too much memory, then it is better +to read the files and send them in small chunks so that the entire file contents +don't have to be in memory at once. -* get response from aiohttp request and then get StreamReader from `resp.content` -* provide the StreamReader to the `variable_values` argument of `execute` -* set the `upload_files` argument to True +We provide methods to do that for two different uses cases: +* Sending local files +* Streaming downloaded files from an external URL to the GraphQL API -.. code-block:: python +Streaming local files +^^^^^^^^^^^^^^^^^^^^^ - async with ClientSession() as client: - async with client.get('YOUR_URL') as resp: - transport = AIOHTTPTransport(url='YOUR_URL') - client = Client(transport=transport) - query = gql(''' - mutation($file: Upload!) { - singleUpload(file: $file) { - id - } - } - ''') +aiohttp allows to upload files using an asynchronous generator. +See `Streaming uploads on aiohttp docs`_. - params = {"file": resp.content} - result = client.execute( - query, variable_values=params, upload_files=True - ) +In order to stream local files, instead of providing opened files to the +`variables_values` argument of `execute`, you need to provide an async generator +which will provide parts of the files. -Asynchronous Generator ----------------------- +You can use `aiofiles`_ +to read the files in chunks and create this asynchronous generator. -In order to upload a single file use asynchronous generator(https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads), you need to: +.. _Streaming uploads on aiohttp docs: https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads +.. _aiofiles: https://github.com/Tinche/aiofiles -* сreate a asynchronous generator -* set the generator as a variable value in the mutation -* provide the opened file to the `variable_values` argument of `execute` -* set the `upload_files` argument to True +Example: .. code-block:: python transport = AIOHTTPTransport(url='YOUR_URL') - client = Client(transport=sample_transport) + client = Client(transport=transport) query = gql(''' mutation($file: Upload!) { @@ -123,7 +117,7 @@ In order to upload a single file use asynchronous generator(https://docs.aiohttp } ''') - async def file_sender(file_name=None): + async def file_sender(file_name): async with aiofiles.open(file_name, 'rb') as f: chunk = await f.read(64*1024) while chunk: @@ -134,4 +128,50 @@ In order to upload a single file use asynchronous generator(https://docs.aiohttp result = client.execute( query, variable_values=params, upload_files=True - ) \ No newline at end of file + ) + +Streaming downloaded files +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If the file you want to upload to the GraphQL API is not present locally +and needs to be downloaded from elsewhere, then it is possible to chain the download +and the upload in order to limit the amout of memory used. + +Because the `content` attribute of an aiohttp response is a `StreamReader` +(it provides an async iterator protocol), you can chain the download and the upload +together. + +In order to do that, you need to: + +* get the response from an aiohttp request and then get the StreamReader instance + from `resp.content` +* provide the StreamReader instance to the `variable_values` argument of `execute` + +Example: + +.. code-block:: python + + # First request to download your file with aiohttp + async with aiohttp.ClientSession() as http_client: + async with http_client.get('YOUR_DOWNLOAD_URL') as resp: + + # We now have a StreamReader instance in resp.content + # and we provide it to the variable_values argument of execute + + transport = AIOHTTPTransport(url='YOUR_GRAPHQL_URL') + + client = Client(transport=transport) + + query = gql(''' + mutation($file: Upload!) { + singleUpload(file: $file) { + id + } + } + ''') + + params = {"file": resp.content} + + result = client.execute( + query, variable_values=params, upload_files=True + ) diff --git a/tests/test_aiohttp.py b/tests/test_aiohttp.py index a1dec30a..0bf8c1ba 100644 --- a/tests/test_aiohttp.py +++ b/tests/test_aiohttp.py @@ -641,7 +641,7 @@ async def test_aiohttp_async_generator_upload(event_loop, aiohttp_server): file_path = test_file.filename - async def file_sender(file_name=None): + async def file_sender(file_name): async with aiofiles.open(file_name, "rb") as f: chunk = await f.read(64 * 1024) while chunk: