Skip to content

Commit 73bd4d8

Browse files
committed
refactor: always use multiprocess spawn
1 parent 4fdc9fa commit 73bd4d8

File tree

6 files changed

+81
-71
lines changed

6 files changed

+81
-71
lines changed

pytest-embedded/pytest_embedded/__init__.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,52 @@
11
"""A pytest plugin that designed for embedded testing."""
22

3-
from .app import App
4-
from .dut import Dut
5-
from .dut_factory import DutFactory
3+
import multiprocessing.queues
4+
from multiprocessing.managers import BaseManager
5+
from typing import AnyStr
6+
7+
MP_CTX = multiprocessing.get_context('spawn')
8+
9+
10+
class MessageQueueManager(BaseManager):
11+
pass
12+
13+
14+
class MessageQueue(multiprocessing.queues.Queue):
15+
def __init__(self, *args, **kwargs):
16+
super().__init__(*args, ctx=MP_CTX, **kwargs)
17+
18+
def put(self, obj, **kwargs):
19+
from .utils import to_bytes
20+
21+
if not isinstance(obj, str | bytes):
22+
super().put(obj, **kwargs)
23+
return
24+
25+
if obj == '' or obj == b'':
26+
return
27+
28+
_b = to_bytes(obj)
29+
try:
30+
super().put(_b)
31+
except Exception: # queue might be closed
32+
pass
33+
34+
def write(self, s: AnyStr):
35+
self.put(s)
36+
37+
def flush(self):
38+
pass
39+
40+
def isatty(self):
41+
return True
42+
43+
44+
MessageQueueManager.register('MessageQueue', MessageQueue)
45+
46+
47+
from .app import App # noqa: E402
48+
from .dut import Dut # noqa: E402
49+
from .dut_factory import DutFactory # noqa: E402
650

751
__all__ = ['App', 'Dut', 'DutFactory']
852

pytest-embedded/pytest_embedded/dut.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import functools
22
import logging
3-
import multiprocessing
43
import os.path
54
import re
65
from collections.abc import Callable
@@ -10,7 +9,7 @@
109
import pexpect
1110

1211
from .app import App
13-
from .log import PexpectProcess
12+
from .log import MessageQueue, PexpectProcess
1413
from .unity import UNITY_SUMMARY_LINE_REGEX, TestSuite
1514
from .utils import Meta, _InjectMixinCls, remove_asci_color_code, to_bytes, to_list
1615

@@ -29,7 +28,7 @@ class Dut(_InjectMixinCls):
2928
def __init__(
3029
self,
3130
pexpect_proc: PexpectProcess,
32-
msg_queue: multiprocessing.Queue,
31+
msg_queue: MessageQueue,
3332
app: App,
3433
pexpect_logfile: str,
3534
test_case_name: str,

pytest-embedded/pytest_embedded/dut_factory.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from pytest_embedded_serial import Serial
2020
from pytest_embedded_wokwi import Wokwi
2121

22-
from . import App, Dut
22+
from . import MP_CTX, App, Dut, MessageQueueManager
2323
from .log import MessageQueue, PexpectProcess
2424
from .utils import FIXTURES_SERVICES, ClassCliOptions, to_str
2525

@@ -28,11 +28,6 @@ def _drop_none_kwargs(kwargs: dict[t.Any, t.Any]):
2828
return {k: v for k, v in kwargs.items() if v is not None}
2929

3030

31-
if sys.platform == 'darwin':
32-
_ctx = multiprocessing.get_context('fork')
33-
else:
34-
_ctx = multiprocessing.get_context()
35-
3631
_stdout = sys.__stdout__
3732

3833

@@ -45,10 +40,6 @@ def _drop_none_kwargs(kwargs: dict[t.Any, t.Any]):
4540
PARAMETRIZED_FIXTURES_CACHE = {}
4641

4742

48-
def msg_queue_gn() -> MessageQueue:
49-
return MessageQueue()
50-
51-
5243
def _listen(q: MessageQueue, filepath: str, with_timestamp: bool = True, count: int = 1, total: int = 1) -> None:
5344
shall_add_prefix = True
5445
while True:
@@ -94,7 +85,7 @@ def _listener_gn(msg_queue, _pexpect_logfile, with_timestamp, dut_index, dut_tot
9485
'total': dut_total,
9586
}
9687

97-
return _ctx.Process(
88+
return MP_CTX.Process(
9889
target=_listen,
9990
args=(
10091
msg_queue,
@@ -742,7 +733,8 @@ def create(
742733
layout = []
743734
try:
744735
global PARAMETRIZED_FIXTURES_CACHE
745-
msg_queue = msg_queue_gn()
736+
737+
msg_queue = MessageQueueManager().MessageQueue()
746738
layout.append(msg_queue)
747739

748740
_pexpect_logfile = os.path.join(

pytest-embedded/pytest_embedded/log.py

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,18 @@
11
import errno
22
import logging
3-
import multiprocessing
43
import os
54
import subprocess
6-
import sys
75
import tempfile
86
import textwrap
97
import uuid
10-
from multiprocessing import queues
118
from typing import AnyStr
129

1310
import pexpect.fdpexpect
1411
from pexpect import EOF, TIMEOUT
1512
from pexpect.utils import poll_ignore_interrupts, select_ignore_interrupts
1613

17-
from .utils import Meta, remove_asci_color_code, to_bytes, to_str, utcnow_str
18-
19-
if sys.platform == 'darwin':
20-
_ctx = multiprocessing.get_context('fork')
21-
else:
22-
_ctx = multiprocessing.get_context()
23-
24-
25-
class MessageQueue(queues.Queue):
26-
def __init__(self, *args, **kwargs):
27-
if 'ctx' not in kwargs:
28-
kwargs['ctx'] = _ctx
29-
30-
super().__init__(*args, **kwargs)
31-
32-
def put(self, obj, **kwargs):
33-
if not isinstance(obj, str | bytes):
34-
super().put(obj, **kwargs)
35-
return
36-
37-
if obj == '' or obj == b'':
38-
return
39-
40-
_b = to_bytes(obj)
41-
try:
42-
super().put(_b, **kwargs)
43-
except: # noqa # queue might be closed
44-
pass
45-
46-
def write(self, s: AnyStr):
47-
self.put(s)
48-
49-
def flush(self):
50-
pass
51-
52-
def isatty(self):
53-
return True
14+
from pytest_embedded import MP_CTX, MessageQueue
15+
from pytest_embedded.utils import Meta, remove_asci_color_code, to_bytes, to_str, utcnow_str
5416

5517

5618
class PexpectProcess(pexpect.fdpexpect.fdspawn):
@@ -144,18 +106,18 @@ def live_print_call(*args, msg_queue: MessageQueue | None = None, expect_returnc
144106
raise subprocess.CalledProcessError(process.returncode, process.args)
145107

146108

147-
class _PopenRedirectProcess(_ctx.Process):
109+
class _PopenRedirectProcess(MP_CTX.Process):
148110
def __init__(self, msg_queue: MessageQueue, logfile: str):
149-
self._q = msg_queue
150-
151-
self.logfile = logfile
152-
153-
super().__init__(target=self._forward_io, daemon=True) # killed by the main process
111+
super().__init__(target=self._forward_io, args=(msg_queue, logfile), daemon=True)
154112

155-
def _forward_io(self) -> None:
156-
with open(self.logfile) as fr:
113+
@staticmethod
114+
def _forward_io(msg_queue, logfile) -> None:
115+
with open(logfile) as fr:
157116
while True:
158-
self._q.put(fr.read())
117+
try:
118+
msg_queue.put(fr.read()) # msg_queue may be closed
119+
except Exception:
120+
break
159121

160122

161123
class DuplicateStdoutPopen(subprocess.Popen):

pytest-embedded/pytest_embedded/plugin.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from _pytest.main import Session
2727
from _pytest.python import Function
2828

29+
from . import MessageQueueManager
2930
from .app import App
3031
from .dut import Dut
3132
from .dut_factory import (
@@ -36,7 +37,6 @@
3637
app_fn,
3738
dut_gn,
3839
gdb_gn,
39-
msg_queue_gn,
4040
openocd_gn,
4141
pexpect_proc_fn,
4242
qemu_gn,
@@ -460,6 +460,10 @@ def _close_or_terminate(obj):
460460
if isinstance(obj, subprocess.Popen | multiprocessing.process.BaseProcess):
461461
obj.terminate()
462462
obj.kill()
463+
elif isinstance(obj, MessageQueue):
464+
obj.close()
465+
obj.cancel_join_thread()
466+
obj.join_thread()
463467
elif isinstance(obj, io.IOBase):
464468
try:
465469
obj.close()
@@ -630,6 +634,16 @@ def port_app_cache() -> dict[str, str]:
630634
return {}
631635

632636

637+
@pytest.fixture(scope='session')
638+
def _mp_manager():
639+
manager = MessageQueueManager()
640+
manager.start()
641+
642+
yield manager
643+
644+
manager.shutdown()
645+
646+
633647
@pytest.fixture
634648
def test_case_tempdir(test_case_name: str, session_tempdir: str) -> str:
635649
"""Function scoped temp dir for pytest-embedded"""
@@ -668,8 +682,8 @@ def _pexpect_logfile(test_case_tempdir, logfile_extension, dut_index, dut_total)
668682

669683
@pytest.fixture
670684
@multi_dut_generator_fixture
671-
def msg_queue() -> MessageQueue: # kwargs passed by `multi_dut_generator_fixture()`
672-
return msg_queue_gn()
685+
def msg_queue(_mp_manager) -> MessageQueue: # kwargs passed by `multi_dut_generator_fixture()`
686+
return _mp_manager.MessageQueue()
673687

674688

675689
@pytest.fixture

pytest-embedded/tests/test_base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,6 @@ def test_expect_all_failed(dut):
290290
result.assert_outcomes(passed=10)
291291

292292

293-
@pytest.mark.xfail(reason='unstable')
294293
def test_expect_from_timeout(testdir):
295294
testdir.makepyfile(r"""
296295
import threading

0 commit comments

Comments
 (0)