Skip to content

Commit a7705db

Browse files
shiakyemlove
andauthored
added support for JSONRPC libraries sending WS Message as binary (#5)
* added support for JSONRPC libraries sending WS Message as binary * Add tests for binary websocket messages * Refactor to clean up the websocket loop Co-authored-by: Emily Mills <[email protected]>
1 parent 7d81002 commit a7705db

File tree

3 files changed

+91
-15
lines changed

3 files changed

+91
-15
lines changed

.coveragerc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
branch = True
33
source =
44
jsonrpc_websocket
5+
6+
[report]
7+
show_missing = True

jsonrpc_websocket/jsonrpc.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import json
23

34
import aiohttp
45
from aiohttp import ClientError
@@ -64,22 +65,43 @@ async def _ws_loop(self):
6465
try:
6566
while True:
6667
msg = await self._client.receive()
67-
if msg.type == aiohttp.WSMsgType.TEXT:
68+
69+
if msg.type == aiohttp.WSMsgType.CLOSED:
70+
break
71+
if msg.type == aiohttp.WSMsgType.ERROR:
72+
break
73+
74+
if msg.type == aiohttp.WSMsgType.BINARY:
75+
try:
76+
# If we get a binary message, try and decode it as a
77+
# UTF-8 JSON string, in case the server is sending
78+
# binary websocket messages. If it doens't decode we'll
79+
# ignore it since we weren't expecting binary messages
80+
# anyway
81+
data = json.loads(msg.data.decode())
82+
except ValueError:
83+
continue
84+
elif msg.type == aiohttp.WSMsgType.TEXT:
6885
try:
6986
data = msg.json()
7087
except ValueError as exc:
7188
raise TransportError('Error Parsing JSON', None, exc)
72-
if 'method' in data:
73-
request = jsonrpc_base.Request.parse(data)
74-
response = self.receive_request(request)
75-
if response:
76-
await self.send_message(response)
77-
else:
78-
self._pending_messages[data['id']].response = data
79-
elif msg.type == aiohttp.WSMsgType.CLOSED:
80-
break
81-
elif msg.type == aiohttp.WSMsgType.ERROR:
82-
break
89+
else:
90+
# This is tested with test_message_ping_ignored, but
91+
# cpython's optimizations prevent coveragepy from detecting
92+
# that it's run
93+
# https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
94+
continue # pragma: no cover
95+
96+
97+
if 'method' in data:
98+
request = jsonrpc_base.Request.parse(data)
99+
response = self.receive_request(request)
100+
if response:
101+
await self.send_message(response)
102+
else:
103+
self._pending_messages[data['id']].response = data
104+
83105
except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
84106
raise TransportError('Transport Error', None, exc)
85107
finally:

tests.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,18 @@ async def send_str(self, data):
4242
def test_receive(self, data):
4343
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.TEXT, data, ''))
4444

45-
def test_binary(self):
46-
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.BINARY, 0, ''))
45+
def test_binary(self, data=bytes()):
46+
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.BINARY, data, ''))
4747

4848
def test_error(self):
4949
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.ERROR, 0, ''))
5050

51+
def test_close(self):
52+
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.CLOSED, 0, ''))
53+
54+
def test_ping(self):
55+
self.receive_queue.put_nowait(aiohttp.WSMessage(aiohttp.WSMsgType.PING, 0, ''))
56+
5157
async def receive(self):
5258
value = await self.receive_queue.get()
5359
if self.receive_side_effect:
@@ -78,7 +84,9 @@ def setUp(self):
7884
self.ws_loop_future = self.loop.run_until_complete(self.server.ws_connect())
7985

8086
def tearDown(self):
81-
self.loop.run_until_complete(self.server.close())
87+
if self.server.connected:
88+
self.client.test_server.test_close()
89+
self.loop.run_until_complete(self.ws_loop_future)
8290
teardown_test_loop(self.loop)
8391

8492
@property
@@ -92,6 +100,9 @@ def handler(self, value):
92100
def receive(self, data):
93101
self.client.test_server.test_receive(data)
94102

103+
def receive_binary(self, data):
104+
self.client.test_server.test_binary(data)
105+
95106
def test_pep8_conformance(self):
96107
"""Test that we conform to PEP8."""
97108

@@ -157,6 +168,30 @@ async def test_message_not_json(self):
157168
await self.ws_loop_future
158169
self.assertIsInstance(transport_error.exception.args[1], ValueError)
159170

171+
@unittest_run_loop
172+
async def test_message_binary_not_utf8(self):
173+
# If we get a binary message, we should try to decode it as JSON, but
174+
# if it's not valid we should just ignore it, and an exception should
175+
# not be thrown
176+
self.receive_binary(bytes((0xE0, 0x80, 0x80)))
177+
self.client.test_server.test_close()
178+
await self.ws_loop_future
179+
180+
@unittest_run_loop
181+
async def test_message_binary_not_json(self):
182+
# If we get a binary message, we should try to decode it as JSON, but
183+
# if it's not valid we should just ignore it, and an exception should
184+
# not be thrown
185+
self.receive_binary('not json'.encode())
186+
self.client.test_server.test_close()
187+
await self.ws_loop_future
188+
189+
@unittest_run_loop
190+
async def test_message_ping_ignored(self):
191+
self.client.test_server.test_ping()
192+
self.client.test_server.test_close()
193+
await self.ws_loop_future
194+
160195
@unittest_run_loop
161196
async def test_connection_timeout(self):
162197
def bad_connect():
@@ -180,6 +215,22 @@ def handler(server, data):
180215

181216
self.receive('{"jsonrpc": "2.0", "method": "test_method", "id": 1}')
182217

218+
@unittest_run_loop
219+
async def test_server_request_binary(self):
220+
# Test that if the server sends a binary websocket message, that's a
221+
# UTF-8 encoded JSON request we process it
222+
def test_method():
223+
return 1
224+
self.server.test_method = test_method
225+
226+
def handler(server, data):
227+
response = json.loads(data)
228+
self.assertEqual(response["result"], 1)
229+
230+
self.handler = handler
231+
232+
self.receive_binary('{"jsonrpc": "2.0", "method": "test_method", "id": 1}'.encode())
233+
183234
@unittest_run_loop
184235
async def test_server_notification(self):
185236
def test_method():

0 commit comments

Comments
 (0)