Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
branch = True
source =
jsonrpc_websocket

[report]
show_missing = True
46 changes: 34 additions & 12 deletions jsonrpc_websocket/jsonrpc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json

import aiohttp
from aiohttp import ClientError
Expand Down Expand Up @@ -64,22 +65,43 @@ async def _ws_loop(self):
try:
while True:
msg = await self._client.receive()
if msg.type == aiohttp.WSMsgType.TEXT:

if msg.type == aiohttp.WSMsgType.CLOSED:
break
if msg.type == aiohttp.WSMsgType.ERROR:
break

if msg.type == aiohttp.WSMsgType.BINARY:
try:
# If we get a binary message, try and decode it as a
# UTF-8 JSON string, in case the server is sending
# binary websocket messages. If it doens't decode we'll
# ignore it since we weren't expecting binary messages
# anyway
data = json.loads(msg.data.decode())
except ValueError:
continue
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
data = msg.json()
except ValueError as exc:
raise TransportError('Error Parsing JSON', None, exc)
if 'method' in data:
request = jsonrpc_base.Request.parse(data)
response = self.receive_request(request)
if response:
await self.send_message(response)
else:
self._pending_messages[data['id']].response = data
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
else:
# This is tested with test_message_ping_ignored, but
# cpython's optimizations prevent coveragepy from detecting
# that it's run
# https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
continue # pragma: no cover


if 'method' in data:
request = jsonrpc_base.Request.parse(data)
response = self.receive_request(request)
if response:
await self.send_message(response)
else:
self._pending_messages[data['id']].response = data

except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
raise TransportError('Transport Error', None, exc)
finally:
Expand Down
57 changes: 54 additions & 3 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ async def send_str(self, data):
def test_receive(self, data):
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.TEXT, data, ''))

def test_binary(self):
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.BINARY, 0, ''))
def test_binary(self, data=bytes()):
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.BINARY, data, ''))

def test_error(self):
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.ERROR, 0, ''))

def test_close(self):
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.CLOSED, 0, ''))

def test_ping(self):
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.PING, 0, ''))

async def receive(self):
value = await self.receive_queue.get()
if self.receive_side_effect:
Expand Down Expand Up @@ -78,7 +84,9 @@ def setUp(self):
self.ws_loop_future = self.loop.run_until_complete(self.server.ws_connect())

def tearDown(self):
self.loop.run_until_complete(self.server.close())
if self.server.connected:
self.client.test_server.test_close()
self.loop.run_until_complete(self.ws_loop_future)
teardown_test_loop(self.loop)

@property
Expand All @@ -92,6 +100,9 @@ def handler(self, value):
def receive(self, data):
self.client.test_server.test_receive(data)

def receive_binary(self, data):
self.client.test_server.test_binary(data)

def test_pep8_conformance(self):
"""Test that we conform to PEP8."""

Expand Down Expand Up @@ -157,6 +168,30 @@ async def test_message_not_json(self):
await self.ws_loop_future
self.assertIsInstance(transport_error.exception.args[1], ValueError)

@unittest_run_loop
async def test_message_binary_not_utf8(self):
# If we get a binary message, we should try to decode it as JSON, but
# if it's not valid we should just ignore it, and an exception should
# not be thrown
self.receive_binary(bytes((0xE0, 0x80, 0x80)))
self.client.test_server.test_close()
await self.ws_loop_future

@unittest_run_loop
async def test_message_binary_not_json(self):
# If we get a binary message, we should try to decode it as JSON, but
# if it's not valid we should just ignore it, and an exception should
# not be thrown
self.receive_binary('not json'.encode())
self.client.test_server.test_close()
await self.ws_loop_future

@unittest_run_loop
async def test_message_ping_ignored(self):
self.client.test_server.test_ping()
self.client.test_server.test_close()
await self.ws_loop_future

@unittest_run_loop
async def test_connection_timeout(self):
def bad_connect():
Expand All @@ -180,6 +215,22 @@ def handler(server, data):

self.receive('{"jsonrpc": "2.0", "method": "test_method", "id": 1}')

@unittest_run_loop
async def test_server_request_binary(self):
# Test that if the server sends a binary websocket message, that's a
# UTF-8 encoded JSON request we process it
def test_method():
return 1
self.server.test_method = test_method

def handler(server, data):
response = json.loads(data)
self.assertEqual(response["result"], 1)

self.handler = handler

self.receive_binary('{"jsonrpc": "2.0", "method": "test_method", "id": 1}'.encode())

@unittest_run_loop
async def test_server_notification(self):
def test_method():
Expand Down