Skip to content
This repository was archived by the owner on Sep 22, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/lint-flake8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ jobs:

steps:
- uses: actions/checkout@v1
- name: Set up Python 3.6
- name: Set up Python 3.7
uses: actions/setup-python@v1
with:
python-version: 3.6
python-version: 3.7
- name: Install dependencies
run: |
python -m pip install -U pip setuptools
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/typecheck-mypy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ jobs:
else
echo "::add-matcher::.github/workflows/mypy-matcher.json"
fi
python -m mypy --no-color-output src/ai/backend || exit 0
python -m mypy --no-color-output src/ai/backend
8 changes: 0 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ stages:

# build matrix for test stage
python:
- "3.6"
- "3.7"
- "3.8"
os:
Expand Down Expand Up @@ -47,13 +46,6 @@ jobs:
username: "__token__"
password:
secure: "nXxyiMTQZgdLnpw+hZBm2nHtlMV9prg5bl+3lB4Q/pnWWaW4VvAU6U2Lw/gljAaD3jxOV+RWKOCdt6ZWmQc9M8Fh5mcTlq9IjcMgk0R39onsP7YP7UJUh7saqZG1EkruglCHwCjcz3XwmRyJ+GKIANDH6jRooEmGQt/b8sR0ZIuMxx9ANNPozGEIxcrEqkO2CT1NQzEYc969danjYoyRImDUyDLKTJKd5ZkC7vwmT9z1chm0oxbZMdBJbL26g3TEr7dq1gQAiiLB5lhFVxklWqYlthlWl5qvmtgcn9ZNh1OA2WF8jTwDaafXoYHOotfq82ASRZI3dOckJQM6bsEJEPh5tTIvJJNxMmPTomHCRmc8/sNfOOoPPTLhjXVGE1BxL4u3DXZt0VAw80mkQseXu9wtzNEdZqCxGlSzycyut4cLtXpWXZDN/zqDYczAPUAYeRi2XbxT06OHhczmtn7WPGp2O/HYrXzHrMjAho0tNdch/62hJycEYAMRN0iQSnB2Gs2Ja7h6WUmf6lw2P4qS8gOSKuBJ3Z5Q0glbS2m28oCDZjP6zBqCwYucMZfUqF/aKiVei0NQp1dvjKUBqMJVogesuOAvtDVo+wN3rp2pcTntEKJHqYbNL9fOzwErJM8r/ZUMGC0HkdyTcnPS7uGkRF5WlzFl1cVBNmHzburc+N4="
allow_failures:
- python: "3.8"
os: linux
fast_finish: true
# exclude the duplicate default test stage
exclude:
- python: "3.6"

notifications:
webhooks:
Expand Down
2 changes: 0 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
version: 1.0.dev{build}
environment:
matrix:
- PYTHON: "C:\\Python36"
- PYTHON: "C:\\Python36-x64"
- PYTHON: "C:\\Python37"
- PYTHON: "C:\\Python37-x64"
- PYTHON: "C:\\Python38"
Expand Down
1 change: 1 addition & 0 deletions changes/97.breaking
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Drop support for Python 3.6
1 change: 1 addition & 0 deletions changes/97.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support APIv5's new GraphQL schema and kernel/session naming changes
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ norecursedirs = venv virtualenv .git
timeout = 5
markers =
integration: Test cases that require real manager (and agents) to be running on http://localhost:8081.

[mypy]
ignore_missing_imports = true
namespace_packages = true
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
'tabulate~=0.8.6',
'tqdm~=4.42',
'yarl~=1.4.2',
'typing-extensions~=3.7.4',
]
build_requires = [
'wheel>=0.34.2',
Expand Down Expand Up @@ -82,7 +83,6 @@ def read_src_version():
'Intended Audience :: Developers',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Operating System :: POSIX',
Expand All @@ -94,7 +94,7 @@ def read_src_version():
],
package_dir={'': 'src'},
packages=find_namespace_packages(where='src', include='ai.backend.*'),
python_requires='>=3.6',
python_requires='>=3.7',
setup_requires=setup_requires,
install_requires=install_requires,
extras_require={
Expand Down
70 changes: 51 additions & 19 deletions src/ai/backend/client/cli/admin/images.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import json
import sys

import click
from tabulate import tabulate
from tqdm import tqdm

from . import admin
from ...session import Session
from ...compat import asyncio_run
from ...session import Session, AsyncSession
from ..pretty import print_done, print_warn, print_fail, print_error


@admin.command()
@click.option('--operation', is_flag=True, help='Get operational images only')
def images(operation):
'''
def images(operation: bool) -> None:
"""
Show the list of registered images in this cluster.
'''
"""
fields = [
('Name', 'name'),
('Registry', 'registry'),
Expand All @@ -32,33 +36,61 @@ def images(operation):
print_warn('There are no registered images.')
return
print(tabulate((item.values() for item in items),
headers=(item[0] for item in fields),
headers=[item[0] for item in fields],
floatfmt=',.0f'))


@admin.command()
@click.option('-r', '--registry', type=str, default=None,
help='The name (usually hostname or "lablup") '
'of the Docker registry configured.')
def rescan_images(registry):
'''Update the kernel image metadata from all configured docker registries.'''
with Session() as session:
try:
result = session.Image.rescan_images(registry)
except Exception as e:
print_error(e)
sys.exit(1)
if result['ok']:
print_done("Updated the image metadata from the configured registries.")
else:
print_fail(f"Rescanning has failed: {result['msg']}")
def rescan_images(registry: str) -> None:
"""
Update the kernel image metadata from all configured docker registries.
"""

async def rescan_images_impl(registry: str) -> None:
async with AsyncSession() as session:
try:
result = await session.Image.rescan_images(registry)
except Exception as e:
print_error(e)
sys.exit(1)
if not result['ok']:
print_fail(f"Failed to begin registry scanning: {result['msg']}")
sys.exit(1)
print_done("Started updating the image metadata from the configured registries.")
task_id = result['task_id']
bgtask = session.BackgroundTask(task_id)
try:
completion_msg_func = lambda: print_done("Finished registry scanning.")
with tqdm(unit='image') as pbar:
async with bgtask.listen_events() as response:
async for ev in response:
data = json.loads(ev.data)
if ev.event == 'task_updated':
pbar.total = data['total_progress']
pbar.write(data['message'])
pbar.update(data['current_progress'] - pbar.n)
elif ev.event == 'task_failed':
error_msg = data['message']
completion_msg_func = \
lambda: print_fail(f"Error occurred: {error_msg}")
elif ev.event == 'task_cancelled':
completion_msg_func = \
lambda: print_warn("Registry scanning has been "
"cancelled in the middle.")
finally:
completion_msg_func()

asyncio_run(rescan_images_impl(registry))


@admin.command()
@click.argument('alias', type=str)
@click.argument('target', type=str)
def alias_image(alias, target):
'''Add an image alias.'''
"""Add an image alias."""
with Session() as session:
try:
result = session.Image.alias_image(alias, target)
Expand All @@ -74,7 +106,7 @@ def alias_image(alias, target):
@admin.command()
@click.argument('alias', type=str)
def dealias_image(alias):
'''Remove an image alias.'''
"""Remove an image alias."""
with Session() as session:
try:
result = session.Image.dealias_image(alias)
Expand Down
8 changes: 5 additions & 3 deletions src/ai/backend/client/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ProxyRunnerContext:
protocol: str
host: str
port: int
args: Dict[str, str]
args: Dict[str, Union[None, str, List[str]]]
envs: Dict[str, str]
api_session: Optional[AsyncSession]
local_server: Optional[asyncio.AbstractServer]
Expand All @@ -150,7 +150,7 @@ def __init__(self, host: str, port: int,
self.exit_code = 0

self.args, self.envs = {}, {}
if len(args) > 0:
if args is not None and len(args) > 0:
for argline in args:
tokens = []
for token in shlex.shlex(argline,
Expand All @@ -168,7 +168,7 @@ def __init__(self, host: str, port: int,
self.args[tokens[0]] = tokens[1]
else:
self.args[tokens[0]] = tokens[1:]
if len(envs) > 0:
if envs is not None and len(envs) > 0:
for envline in envs:
split = envline.strip().split('=', maxsplit=2)
if len(split) == 2:
Expand All @@ -178,6 +178,7 @@ def __init__(self, host: str, port: int,

async def handle_connection(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
assert self.api_session is not None
p = WSProxy(self.api_session, self.session_name,
self.app_name, self.protocol,
self.args, self.envs,
Expand Down Expand Up @@ -232,6 +233,7 @@ async def __aexit__(self, *exc_info) -> None:
print_info("Shutting down....")
self.local_server.close()
await self.local_server.wait_closed()
assert self.api_session is not None
await self.api_session.__aexit__(*exc_info)
assert self.api_session.closed
if self.local_server is not None:
Expand Down
20 changes: 13 additions & 7 deletions src/ai/backend/client/cli/proxy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from __future__ import annotations

import asyncio
import json
import re
from typing import (
Union,
Tuple,
AsyncIterator,
)

import aiohttp
from aiohttp import web
Expand All @@ -19,6 +26,8 @@ class WebSocketProxy:
'upstream_buffer', 'upstream_buffer_task',
)

upstream_buffer: asyncio.Queue[Tuple[Union[str, bytes], aiohttp.WSMsgType]]

def __init__(self, up_conn: aiohttp.ClientWebSocketResponse,
down_conn: web.WebSocketResponse):
self.up_conn = up_conn
Expand Down Expand Up @@ -182,18 +191,15 @@ async def websocket_handler(request):
reason="Internal Server Error")


async def startup_proxy(app):
async def proxy_context(app: web.Application) -> AsyncIterator[None]:
app['client_session'] = AsyncSession()


async def cleanup_proxy(app):
await app['client_session'].close()
async with app['client_session']:
yield


def create_proxy_app():
app = web.Application()
app.on_startup.append(startup_proxy)
app.on_cleanup.append(cleanup_proxy)
app.cleanup_ctx.append(proxy_context)

app.router.add_route("GET", r'/stream/{path:.*$}', websocket_handler)
app.router.add_route("GET", r'/wsproxy/{path:.*$}', websocket_handler)
Expand Down
10 changes: 5 additions & 5 deletions src/ai/backend/client/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..compat import asyncio_run, current_loop
from ..exceptions import BackendError, BackendAPIError
from ..session import Session, AsyncSession, is_legacy_server
from ..utils import undefined
from ..types import undefined
from .pretty import (
print_info, print_wait, print_done, print_error, print_fail, print_warn,
format_info,
Expand Down Expand Up @@ -881,7 +881,7 @@ def start(image, name, owner, # base args
help='Set the owner of the target session explicitly.')
# job scheduling options
@click.option('--type', 'type_', metavar='SESSTYPE',
type=click.Choice(['batch', 'interactive', undefined]),
type=click.Choice(['batch', 'interactive', undefined]), # type: ignore
default=undefined,
help='Either batch or interactive')
@click.option('-i', '--image', default=undefined,
Expand Down Expand Up @@ -1145,9 +1145,9 @@ def events(name, owner_access_key):
async def _run_events():
async with AsyncSession() as session:
compute_session = session.ComputeSession(name, owner_access_key)
async with compute_session.stream_events() as sse_response:
async for ev in sse_response.fetch_events():
print(click.style(ev['event'], fg='cyan', bold=True), json.loads(ev['data']))
async with compute_session.listen_events() as response:
async for ev in response:
print(click.style(ev.event, fg='cyan', bold=True), json.loads(ev.data))

try:
asyncio_run(_run_events())
Expand Down
15 changes: 11 additions & 4 deletions src/ai/backend/client/compat.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
'''
"""
A compatibility module for backported codes from Python 3.6+ standard library.
'''
"""

import asyncio

__all__ = (
'current_loop',
'all_tasks',
'asyncio_run',
'asyncio_run_forever',
)


if hasattr(asyncio, 'get_running_loop'): # Python 3.7+
current_loop = asyncio.get_running_loop
Expand Down Expand Up @@ -60,11 +67,11 @@ def _asyncio_run(coro, *, debug=False):


def asyncio_run_forever(server_context, *, debug=False):
'''
"""
A proposed-but-not-implemented asyncio.run_forever() API based on
@vxgmichel's idea.
See discussions on https://github.com/python/asyncio/pull/465
'''
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(debug)
Expand Down
Loading