Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
## Current (in progress)

- Initial version
- Add endpoint to stream a CSV response [#5](https://github.com/etalab/api-tabular/pull/5)
40 changes: 38 additions & 2 deletions api_tabular/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from api_tabular import config
from api_tabular.query import get_resource, get_resource_data
from api_tabular.query import (
get_resource,
get_resource_data,
get_resource_data_streamed,
)
from api_tabular.utils import build_sql_query_string, build_link_with_page
from api_tabular.error import QueryException

Expand Down Expand Up @@ -61,7 +65,9 @@ async def resource_data(request):
page_size = int(request.query.get("page_size", config.PAGE_SIZE_DEFAULT))

if page_size > config.PAGE_SIZE_MAX:
raise QueryException(400, None, "Invalid query string", "Page size exceeds allowed maximum")
raise QueryException(
400, None, "Invalid query string", "Page size exceeds allowed maximum"
)
if page > 1:
offset = page_size * (page - 1)
else:
Expand Down Expand Up @@ -93,6 +99,36 @@ async def resource_data(request):
return web.json_response(body)


@routes.get(r"/api/resources/{rid}/data/csv/")
async def resource_data_csv(request):
resource_id = request.match_info["rid"]
query_string = request.query_string.split("&") if request.query_string else []

try:
sql_query = build_sql_query_string(query_string)
except ValueError:
raise QueryException(400, None, "Invalid query string", "Malformed query")

resource = await get_resource(
request.app["csession"], resource_id, ["parsing_table"]
)

response_headers = {
"Content-Disposition": f'attachment; filename="{resource_id}.csv"',
"Content-Type": "text/csv",
}
response = web.StreamResponse(headers=response_headers)
await response.prepare(request)

async for chunk in get_resource_data_streamed(
request.app["csession"], resource, sql_query
):
await response.write(chunk)

await response.write_eof()
return response


async def app_factory():
async def on_startup(app):
app["csession"] = ClientSession()
Expand Down
4 changes: 3 additions & 1 deletion api_tabular/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ def __init__(self, status, error_code, title, detail) -> None:
super().__init__(content_type="application/json", text=json.dumps(error_body))


def handle_exception(status: int, title: str, detail: Union[str, dict], resource_id: str = None):
def handle_exception(
status: int, title: str, detail: Union[str, dict], resource_id: str = None
):
event_id = None
e = Exception(detail)
if config.SENTRY_DSN:
Expand Down
57 changes: 48 additions & 9 deletions api_tabular/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from api_tabular import config
from api_tabular.utils import build_sql_query_string, build_link_with_page, process_total
from api_tabular.utils import (
build_sql_query_string,
build_link_with_page,
process_total,
)
from api_tabular.error import QueryException, handle_exception

routes = web.RouteTableDef()
Expand All @@ -22,23 +26,35 @@ async def get_object_data(session: ClientSession, model: str, sql_query: str):
url = f"{config.PG_RST_URL}/{model}?{sql_query}"
async with session.get(url, headers=headers) as res:
if not res.ok:
handle_exception(
res.status, "Database error", await res.json(), None
)
handle_exception(res.status, "Database error", await res.json(), None)
record = await res.json()
total = process_total(res.headers.get("Content-Range"))
return record, total


async def get_object_data_streamed(
session: ClientSession, model: str, sql_query: str, accept_format: str = "text/csv"
):
headers = {"Accept": accept_format}
url = f"{config.PG_RST_URL}/{model}?{sql_query}"
async with session.get(url, headers=headers) as res:
if not res.ok:
handle_exception(res.status, "Database error", await res.json(), None)
async for chunk in res.content.iter_chunked(1024):
yield chunk


@routes.get(r"/api/{model}/data/")
async def resource_data(request):
async def metrics_data(request):
model = request.match_info["model"]
query_string = request.query_string.split("&") if request.query_string else []
page = int(request.query.get("page", "1"))
page_size = int(request.query.get("page_size", config.PAGE_SIZE_DEFAULT))

if page_size > config.PAGE_SIZE_MAX:
raise QueryException(400, None, "Invalid query string", "Page size exceeds allowed maximum")
raise QueryException(
400, None, "Invalid query string", "Page size exceeds allowed maximum"
)
if page > 1:
offset = page_size * (page - 1)
else:
Expand All @@ -49,9 +65,7 @@ async def resource_data(request):
except ValueError:
raise QueryException(400, None, "Invalid query string", "Malformed query")

response, total = await get_object_data(
request.app["csession"], model, sql_query
)
response, total = await get_object_data(request.app["csession"], model, sql_query)

next = build_link_with_page(request.path, query_string, page + 1, page_size)
prev = build_link_with_page(request.path, query_string, page - 1, page_size)
Expand All @@ -66,6 +80,31 @@ async def resource_data(request):
return web.json_response(body)


@routes.get(r"/api/{model}/data/csv/")
async def metrics_data_csv(request):
model = request.match_info["model"]
query_string = request.query_string.split("&") if request.query_string else []

try:
sql_query = build_sql_query_string(query_string)
except ValueError:
raise QueryException(400, None, "Invalid query string", "Malformed query")

response_headers = {
"Content-Disposition": f'attachment; filename="{model}.csv"',
"Content-Type": "text/csv",
}
response = web.StreamResponse(headers=response_headers)
await response.prepare(request)

async for chunk in get_object_data_streamed(
request.app["csession"], model, sql_query
):
await response.write(chunk)

return response


async def app_factory():
async def on_startup(app):
app["csession"] = ClientSession()
Expand Down
19 changes: 18 additions & 1 deletion api_tabular/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,25 @@ async def get_resource_data(session: ClientSession, resource: dict, sql_query: s
async with session.get(url, headers=headers) as res:
if not res.ok:
handle_exception(
res.status, "Database error", await res.json(), resource.get('id')
res.status, "Database error", await res.json(), resource.get("id")
)
record = await res.json()
total = process_total(res.headers.get("Content-Range"))
return record, total


async def get_resource_data_streamed(
session: ClientSession,
resource: dict,
sql_query: str,
accept_format: str = "text/csv",
):
headers = {"Accept": accept_format}
url = f"{config.PG_RST_URL}/{resource['parsing_table']}?{sql_query}"
async with session.get(url, headers=headers) as res:
if not res.ok:
handle_exception(
res.status, "Database error", await res.json(), resource.get("id")
)
async for chunk in res.content.iter_chunked(1024):
yield chunk
7 changes: 5 additions & 2 deletions api_tabular/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
def build_sql_query_string(request_arg: list, page_size: int, offset: int = 0) -> str:
def build_sql_query_string(
request_arg: list, page_size: int = None, offset: int = 0
) -> str:
sql_query = []
for arg in request_arg:
argument, value = arg.split("=")
Expand All @@ -18,7 +20,8 @@ def build_sql_query_string(request_arg: list, page_size: int, offset: int = 0) -
sql_query.append(f"{column}=lte.{value}")
elif normalized_comparator == "greater":
sql_query.append(f"{column}=gte.{value}")
sql_query.append(f"limit={page_size}")
if page_size:
sql_query.append(f"limit={page_size}")
if offset >= 1:
sql_query.append(f"offset={offset}")
return "&".join(sql_query)
Expand Down
16 changes: 16 additions & 0 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ def test_query_build_sort_asc():
assert result == "order=column_name.asc&limit=50"


def test_query_build_sort_asc_without_limit():
query_str = ["column_name__sort=asc"]
result = build_sql_query_string(query_str)
assert result == "order=column_name.asc"


def test_query_build_sort_asc_with_page_in_query():
query_str = [
"column_name__sort=asc",
"page=2",
"page_size=20",
]
result = build_sql_query_string(query_str)
assert result == "order=column_name.asc"


def test_query_build_sort_desc():
query_str = ["column_name__sort=desc"]
result = build_sql_query_string(query_str, 50)
Expand Down