Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
19db90d
SNOW-1617614: asyncio network implementation, test set up (#2019)
sfc-gh-aling Aug 8, 2024
5f6d2da
SNOW-1348650: implement ocsp validation (#2025)
sfc-gh-aling Aug 16, 2024
10927e6
SNOW-1572213 implement result set iterator (#2052)
sfc-gh-aling Sep 11, 2024
74b1b87
SNOW-1572294: connection async api coverage (#2057)
sfc-gh-aling Sep 19, 2024
5f8f235
SNOW-1572316: async query timer bomb (#2059)
sfc-gh-aling Sep 24, 2024
bb3dc64
SNOW-1654538: asyncio download timeout setting (#2063)
sfc-gh-aling Sep 27, 2024
a753daa
SNOW-1572300: async cursor coverage (#2062)
sfc-gh-aling Sep 30, 2024
e687be4
Asyncio support for aws file transfer (#2031)
sfc-gh-yuwang Oct 9, 2024
9a85c62
SNOW-1572226: implement all authentication methods (#2064)
sfc-gh-aling Oct 11, 2024
a7f35b8
SNOW-1728340: support gcp and azure (#2067)
sfc-gh-yuwang Oct 15, 2024
b6e0a38
SNOW-1572304: asyncio add proxy support and test (#2066)
sfc-gh-aling Oct 17, 2024
d8935da
SNOW-1720699: fix network implementation and add unit test (#2071)
sfc-gh-aling Oct 17, 2024
fe6faae
SNOW-1628850: fix s3 accelerate logic (#2070)
sfc-gh-yuwang Oct 18, 2024
c5b4e47
SNOW-1654536: async binding stage bind upload agent (#2069)
sfc-gh-aling Oct 18, 2024
957c85c
SNOW-1740361: raise error below python less than 3 10 (#2075)
sfc-gh-aling Oct 23, 2024
4de4f55
SNOW-1759076: async for support in cursor get result batches (#2080)
sfc-gh-aling Oct 23, 2024
075fde0
SNOW-1757241: migrate all integ test (#2076)
sfc-gh-aling Oct 24, 2024
d13e4e2
SNOW-1617451: async telemetry support and client name change (#2077)
sfc-gh-yuwang Oct 25, 2024
b637a22
SNOW-1664063: sync main branch changes into async part (#2081)
sfc-gh-aling Oct 25, 2024
524e19f
SNOW-1572306: error experience in asyncio (#2082)
sfc-gh-aling Oct 25, 2024
91d68eb
SNOW-1708720: Error is raised at debug level when not closing connect…
sfc-gh-yuwang Oct 28, 2024
ffb507b
SNOW-1763960: clean up todos in code base (#2091)
sfc-gh-aling Oct 29, 2024
7cfff25
SNOW-1572311:add stress test (#2097)
sfc-gh-yuwang Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ on:
branches:
- master
- main
- dev/aio-connector
tags:
- v*
pull_request:
branches:
- master
- main
- prep-**
- dev/aio-connector
workflow_dispatch:
inputs:
logLevel:
Expand Down Expand Up @@ -332,10 +334,101 @@ jobs:
.coverage.py${{ env.shortver }}-lambda-ci
junit.py${{ env.shortver }}-lambda-ci-dev.xml

test-aio:
name: Test asyncio ${{ matrix.os.download_name }}-${{ matrix.python-version }}-${{ matrix.cloud-provider }}
needs: build
runs-on: ${{ matrix.os.image_name }}
strategy:
fail-fast: false
matrix:
os:
- image_name: ubuntu-latest
download_name: manylinux_x86_64
- image_name: macos-latest
download_name: macosx_x86_64
- image_name: windows-2019
download_name: win_amd64
python-version: ["3.10", "3.11", "3.12"]
cloud-provider: [aws, azure, gcp]
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Setup parameters file
shell: bash
env:
PARAMETERS_SECRET: ${{ secrets.PARAMETERS_SECRET }}
run: |
gpg --quiet --batch --yes --decrypt --passphrase="$PARAMETERS_SECRET" \
.github/workflows/parameters/public/parameters_${{ matrix.cloud-provider }}.py.gpg > test/parameters.py
- name: Download wheel(s)
uses: actions/download-artifact@v4
with:
name: ${{ matrix.os.download_name }}_py${{ matrix.python-version }}
path: dist
- name: Show wheels downloaded
run: ls -lh dist
shell: bash
- name: Upgrade setuptools, pip and wheel
run: python -m pip install -U setuptools pip wheel
- name: Install tox
run: python -m pip install tox>=4
- name: Run tests
run: python -m tox run -e aio
env:
PYTHON_VERSION: ${{ matrix.python-version }}
cloud_provider: ${{ matrix.cloud-provider }}
PYTEST_ADDOPTS: --color=yes --tb=short
TOX_PARALLEL_NO_SPINNER: 1
shell: bash
- name: Combine coverages
run: python -m tox run -e coverage --skip-missing-interpreters false
shell: bash
- uses: actions/upload-artifact@v4
with:
name: coverage_aio_${{ matrix.os.download_name }}-${{ matrix.python-version }}-${{ matrix.cloud-provider }}
path: |
.tox/.coverage
.tox/coverage.xml

test-unsupporeted-aio:
name: Test unsupported asyncio ${{ matrix.os.download_name }}-${{ matrix.python-version }}
runs-on: ${{ matrix.os.image_name }}
strategy:
fail-fast: false
matrix:
os:
- image_name: ubuntu-latest
download_name: manylinux_x86_64
python-version: [ "3.8", "3.9" ]
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Upgrade setuptools, pip and wheel
run: python -m pip install -U setuptools pip wheel
- name: Install tox
run: python -m pip install tox>=4
- name: Run tests
run: python -m tox run -e aio-unsupported-python
env:
PYTHON_VERSION: ${{ matrix.python-version }}
PYTEST_ADDOPTS: --color=yes --tb=short
TOX_PARALLEL_NO_SPINNER: 1
shell: bash

combine-coverage:
if: ${{ success() || failure() }}
name: Combine coverage
needs: [lint, test, test-fips, test-lambda]
needs: [lint, test, test-fips, test-lambda, test-aio]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion ci/test_fips.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ python -c "from cryptography.hazmat.backends.openssl import backend;print('Cryp
pip freeze

cd $CONNECTOR_DIR
pytest -vvv --cov=snowflake.connector --cov-report=xml:coverage.xml test
pytest -vvv --cov=snowflake.connector --cov-report=xml:coverage.xml test --ignore=test/integ/aio --ignore=test/unit/aio

deactivate
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ development =
pytest-timeout
pytest-xdist
pytzdata
pytest-asyncio
pandas =
pandas>=1.0.0,<3.0.0
pyarrow
secure-local-storage =
keyring>=23.1.0,<26.0.0
aio =
aiohttp
20 changes: 20 additions & 0 deletions src/snowflake/connector/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

from ._connection import SnowflakeConnection
from ._cursor import DictCursor, SnowflakeCursor

__all__ = [
SnowflakeConnection,
SnowflakeCursor,
DictCursor,
]


async def connect(**kwargs) -> SnowflakeConnection:
conn = SnowflakeConnection(**kwargs)
await conn.connect()
return conn
210 changes: 210 additions & 0 deletions src/snowflake/connector/aio/_azure_storage_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from __future__ import annotations

import json
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from logging import getLogger
from random import choice
from string import hexdigits
from typing import TYPE_CHECKING, Any

import aiohttp

from ..azure_storage_client import AzureCredentialFilter
from ..azure_storage_client import (
SnowflakeAzureRestClient as SnowflakeAzureRestClientSync,
)
from ..compat import quote
from ..constants import FileHeader, ResultStatus
from ..encryption_util import EncryptionMetadata
from ._storage_client import SnowflakeStorageClient as SnowflakeStorageClientAsync

if TYPE_CHECKING: # pragma: no cover
from ..file_transfer_agent import SnowflakeFileMeta, StorageCredential

from ..azure_storage_client import (
ENCRYPTION_DATA,
MATDESC,
TOKEN_EXPIRATION_ERR_MESSAGE,
)

logger = getLogger(__name__)

getLogger("aiohttp").addFilter(AzureCredentialFilter())


class SnowflakeAzureRestClient(
SnowflakeStorageClientAsync, SnowflakeAzureRestClientSync
):
def __init__(
self,
meta: SnowflakeFileMeta,
credentials: StorageCredential | None,
chunk_size: int,
stage_info: dict[str, Any],
use_s3_regional_url: bool = False,
) -> None:
SnowflakeAzureRestClientSync.__init__(
self,
meta=meta,
stage_info=stage_info,
chunk_size=chunk_size,
credentials=credentials,
)

async def _has_expired_token(self, response: aiohttp.ClientResponse) -> bool:
return response.status == 403 and any(
message in response.reason for message in TOKEN_EXPIRATION_ERR_MESSAGE
)

async def _send_request_with_authentication_and_retry(
self,
verb: str,
url: str,
retry_id: int | str,
headers: dict[str, Any] = None,
data: bytes = None,
) -> aiohttp.ClientResponse:
if not headers:
headers = {}

def generate_authenticated_url_and_rest_args() -> tuple[str, dict[str, Any]]:
curtime = datetime.now(timezone.utc).replace(tzinfo=None)
timestamp = curtime.strftime("YYYY-MM-DD")
sas_token = self.credentials.creds["AZURE_SAS_TOKEN"]
if sas_token and sas_token.startswith("?"):
sas_token = sas_token[1:]
if "?" in url:
_url = url + "&" + sas_token
else:
_url = url + "?" + sas_token
headers["Date"] = timestamp
rest_args = {"headers": headers}
if data:
rest_args["data"] = data
return _url, rest_args

return await self._send_request_with_retry(
verb, generate_authenticated_url_and_rest_args, retry_id
)

async def get_file_header(self, filename: str) -> FileHeader | None:
"""Gets Azure file properties."""
container_name = quote(self.azure_location.container_name)
path = quote(self.azure_location.path) + quote(filename)
meta = self.meta
# HTTP HEAD request
url = f"https://{self.storage_account}.blob.{self.endpoint}/{container_name}/{path}"
retry_id = "HEAD"
self.retry_count[retry_id] = 0
r = await self._send_request_with_authentication_and_retry(
"HEAD", url, retry_id
)
if r.status == 200:
meta.result_status = ResultStatus.UPLOADED
enc_data_str = r.headers.get(ENCRYPTION_DATA)
encryption_data = None if enc_data_str is None else json.loads(enc_data_str)
encryption_metadata = (
None
if not encryption_data
else EncryptionMetadata(
key=encryption_data["WrappedContentKey"]["EncryptedKey"],
iv=encryption_data["ContentEncryptionIV"],
matdesc=r.headers.get(MATDESC),
)
)
return FileHeader(
digest=r.headers.get("x-ms-meta-sfcdigest"),
content_length=int(r.headers.get("Content-Length")),
encryption_metadata=encryption_metadata,
)
elif r.status == 404:
meta.result_status = ResultStatus.NOT_FOUND_FILE
return FileHeader(
digest=None, content_length=None, encryption_metadata=None
)
else:
r.raise_for_status()

async def _initiate_multipart_upload(self) -> None:
self.block_ids = [
"".join(choice(hexdigits) for _ in range(20))
for _ in range(self.num_of_chunks)
]

async def _upload_chunk(self, chunk_id: int, chunk: bytes) -> None:
container_name = quote(self.azure_location.container_name)
path = quote(self.azure_location.path + self.meta.dst_file_name.lstrip("/"))

if self.num_of_chunks > 1:
block_id = self.block_ids[chunk_id]
url = (
f"https://{self.storage_account}.blob.{self.endpoint}/{container_name}/{path}?comp=block"
f"&blockid={block_id}"
)
headers = {"Content-Length": str(len(chunk))}
r = await self._send_request_with_authentication_and_retry(
"PUT", url, chunk_id, headers=headers, data=chunk
)
else:
# single request
azure_metadata = self._prepare_file_metadata()
url = f"https://{self.storage_account}.blob.{self.endpoint}/{container_name}/{path}"
headers = {
"x-ms-blob-type": "BlockBlob",
"Content-Encoding": "utf-8",
}
headers.update(azure_metadata)
r = await self._send_request_with_authentication_and_retry(
"PUT", url, chunk_id, headers=headers, data=chunk
)
r.raise_for_status() # expect status code 201

async def _complete_multipart_upload(self) -> None:
container_name = quote(self.azure_location.container_name)
path = quote(self.azure_location.path + self.meta.dst_file_name.lstrip("/"))
url = (
f"https://{self.storage_account}.blob.{self.endpoint}/{container_name}/{path}?comp"
f"=blocklist"
)
root = ET.Element("BlockList")
for block_id in self.block_ids:
part = ET.Element("Latest")
part.text = block_id
root.append(part)
headers = {"x-ms-blob-content-encoding": "utf-8"}
azure_metadata = self._prepare_file_metadata()
headers.update(azure_metadata)
retry_id = "COMPLETE"
self.retry_count[retry_id] = 0
r = await self._send_request_with_authentication_and_retry(
"PUT", url, "COMPLETE", headers=headers, data=ET.tostring(root)
)
r.raise_for_status() # expects status code 201

async def download_chunk(self, chunk_id: int) -> None:
container_name = quote(self.azure_location.container_name)
path = quote(self.azure_location.path + self.meta.src_file_name.lstrip("/"))
url = f"https://{self.storage_account}.blob.{self.endpoint}/{container_name}/{path}"
if self.num_of_chunks > 1:
chunk_size = self.chunk_size
if chunk_id < self.num_of_chunks - 1:
_range = f"{chunk_id * chunk_size}-{(chunk_id + 1) * chunk_size - 1}"
else:
_range = f"{chunk_id * chunk_size}-"
headers = {"Range": f"bytes={_range}"}
r = await self._send_request_with_authentication_and_retry(
"GET", url, chunk_id, headers=headers
) # expect 206
else:
# single request
r = await self._send_request_with_authentication_and_retry(
"GET", url, chunk_id
)
if r.status in (200, 206):
self.write_downloaded_chunk(chunk_id, await r.read())
r.raise_for_status()
Loading
Loading