Skip to content

Commit c6beefd

Browse files
committed
Restore kqueue after daemonize because it's not inherited on fork()
1 parent c9c4076 commit c6beefd

File tree

7 files changed

+79
-24
lines changed

7 files changed

+79
-24
lines changed

supervisor/options.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from supervisor import loggers
5151
from supervisor import states
5252
from supervisor import xmlrpc
53+
from supervisor import poller
5354

5455
mydir = os.path.abspath(os.path.dirname(__file__))
5556
version_txt = os.path.join(mydir, 'version.txt')
@@ -431,6 +432,7 @@ def __init__(self):
431432
self.process_group_configs = []
432433
self.parse_warnings = []
433434
self.signal_receiver = SignalReceiver()
435+
self.poller = poller.Poller(self)
434436

435437
def version(self, dummy):
436438
"""Print version to stdout and exit(0).
@@ -982,6 +984,11 @@ def server_configs_from_parser(self, parser):
982984
return configs
983985

984986
def daemonize(self):
987+
self.poller.before_daemonize()
988+
self._daemonize()
989+
self.poller.after_daemonize()
990+
991+
def _daemonize(self):
985992
# To daemonize, we need to become the leader of our own session
986993
# (process) group. If we do not, signals sent to our
987994
# parent process will also be sent to us. This might be bad because

supervisor/poller.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ def unregister(self, fd):
2424
def poll(self, timeout):
2525
raise NotImplementedError
2626

27+
def before_daemonize(self):
28+
pass
29+
30+
def after_daemonize(self):
31+
pass
32+
33+
2734
class SelectPoller(BasePoller):
2835

2936
def initialize(self):
@@ -120,13 +127,17 @@ class KQueuePoller(BasePoller):
120127

121128
def initialize(self):
122129
self._kqueue = select.kqueue()
130+
self.readables = set()
131+
self.writables = set()
123132

124133
def register_readable(self, fd):
134+
self.readables.add(fd)
125135
kevent = select.kevent(fd, filter=select.KQ_FILTER_READ,
126136
flags=select.KQ_EV_ADD)
127137
self._kqueue_control(fd, kevent)
128138

129139
def register_writable(self, fd):
140+
self.writables.add(fd)
130141
kevent = select.kevent(fd, filter=select.KQ_FILTER_WRITE,
131142
flags=select.KQ_EV_ADD)
132143
self._kqueue_control(fd, kevent)
@@ -165,6 +176,16 @@ def poll(self, timeout):
165176

166177
return readables, writables
167178

179+
def before_daemonize(self):
180+
self._kqueue.close()
181+
self._kqueue = None
182+
183+
def after_daemonize(self):
184+
self._kqueue = select.kqueue()
185+
for fd in self.readables:
186+
self.register_readable(fd)
187+
for fd in self.writables:
188+
self.register_writable(fd)
168189

169190
def implements_poll():
170191
return hasattr(select, 'poll')

supervisor/supervisord.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
from supervisor import events
4444
from supervisor.states import SupervisorStates
4545
from supervisor.states import getProcessStateDescription
46-
from supervisor.poller import Poller
4746

4847
class Supervisor:
4948
stopping = False # set after we detect that we are handling a stop request
@@ -53,7 +52,6 @@ class Supervisor:
5352

5453
def __init__(self, options):
5554
self.options = options
56-
self.poller = Poller(options)
5755
self.process_groups = {}
5856
self.ticks = {}
5957

@@ -210,11 +208,11 @@ def runforever(self):
210208

211209
for fd, dispatcher in combined_map.items():
212210
if dispatcher.readable():
213-
self.poller.register_readable(fd)
211+
self.options.poller.register_readable(fd)
214212
if dispatcher.writable():
215-
self.poller.register_writable(fd)
213+
self.options.poller.register_writable(fd)
216214

217-
r, w = self.poller.poll(timeout)
215+
r, w = self.options.poller.poll(timeout)
218216

219217
for fd in r:
220218
if combined_map.has_key(fd):

supervisor/tests/base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def __init__(self):
6767
self.changed_directory = False
6868
self.chdir_error = None
6969
self.umaskset = None
70+
self.poller = DummyPoller(self)
7071

7172
def getLogger(self, *args, **kw):
7273
logger = DummyLogger()
@@ -1035,6 +1036,19 @@ def __init__(self, serial='abc'):
10351036

10361037
def __str__(self):
10371038
return 'dummy event'
1039+
1040+
class DummyPoller:
1041+
def __init__(self, options):
1042+
self.result = [], []
1043+
1044+
def register_readable(self, fd):
1045+
pass
1046+
1047+
def register_writable(self, fd):
1048+
pass
1049+
1050+
def poll(self, timeout):
1051+
return self.result
10381052

10391053
def dummy_handler(event, result):
10401054
pass

supervisor/tests/test_options.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,14 @@ def test_dropPrivileges_nonroot_different_user(self):
15861586
msg = instance.dropPrivileges(42)
15871587
self.assertEqual(msg, "Can't drop privilege as nonroot user")
15881588

1589+
def test_daemonize_notifies_poller_before_and_after_fork(self):
1590+
instance = self._makeOne()
1591+
instance._daemonize = lambda: None
1592+
instance.poller = Mock()
1593+
instance.daemonize()
1594+
instance.poller.before_daemonize.assert_called_once_with()
1595+
instance.poller.after_daemonize.assert_called_once_with()
1596+
15891597
class TestProcessConfig(unittest.TestCase):
15901598
def _getTargetClass(self):
15911599
from supervisor.options import ProcessConfig

supervisor/tests/test_poller.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import unittest
33
import errno
44
import select
5+
from mock import Mock
56

67
from supervisor.poller import SelectPoller, PollPoller, KQueuePoller
78
from supervisor.poller import implements_poll, implements_kqueue
@@ -84,6 +85,7 @@ def test_register_readable(self):
8485
poller = self._makeOne(DummyOptions())
8586
poller._kqueue = kqueue
8687
poller.register_readable(6)
88+
self.assertEqual(list(poller.readables), [6])
8789
self.assertEqual(len(kqueue.registered_kevents), 1)
8890
self.assertReadEventAdded(kqueue, kqueue.registered_kevents[0], 6)
8991

@@ -92,6 +94,7 @@ def test_register_writable(self):
9294
poller = self._makeOne(DummyOptions())
9395
poller._kqueue = kqueue
9496
poller.register_writable(7)
97+
self.assertEqual(list(poller.writables), [7])
9598
self.assertEqual(len(kqueue.registered_kevents), 1)
9699
self.assertWriteEventAdded(kqueue, kqueue.registered_kevents[0], 7)
97100

@@ -144,6 +147,27 @@ def test_poll_uncaught_exception(self):
144147
poller.register_readable(6)
145148
self.assertRaises(OSError, poller.poll, 1000)
146149

150+
def test_before_daemonize_closes_kqueue(self):
151+
mock_kqueue = Mock()
152+
options = DummyOptions()
153+
poller = self._makeOne(options)
154+
poller._kqueue = mock_kqueue
155+
poller.before_daemonize()
156+
mock_kqueue.close.assert_called_once_with()
157+
self.assertEqual(poller._kqueue, None)
158+
159+
def test_after_daemonize_restores_kqueue(self):
160+
options = DummyOptions()
161+
poller = self._makeOne(options)
162+
poller.readables = [1]
163+
poller.writables = [3]
164+
poller.register_readable = Mock()
165+
poller.register_writable = Mock()
166+
poller.after_daemonize()
167+
self.assertTrue(isinstance(poller._kqueue, select.kqueue))
168+
poller.register_readable.assert_called_with(1)
169+
poller.register_writable.assert_called_with(3)
170+
147171
def assertReadEventAdded(self, kqueue, kevent, fd):
148172
self.assertEventAdded(kqueue, kevent, fd, select.KQ_FILTER_READ)
149173

supervisor/tests/test_supervisord.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,7 @@ def test_runforever_calls_tick(self):
373373

374374
def test_runforever_poll_dispatchers(self):
375375
options = DummyOptions()
376-
poller = DummyPoller(options)
377-
poller.result = [6], [7, 8]
376+
options.poller.result = [6], [7, 8]
378377
supervisord = self._makeOne(options)
379378
pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
380379
process = DummyProcess(pconfig)
@@ -385,7 +384,6 @@ def test_runforever_poll_dispatchers(self):
385384
error = DummyDispatcher(writable=True, error=OSError)
386385
pgroup.dispatchers = {6:readable, 7:writable, 8:error}
387386
supervisord.process_groups = {'foo': pgroup}
388-
supervisord.poller = poller
389387
options.test = True
390388
supervisord.runforever()
391389
self.assertEqual(pgroup.transitioned, True)
@@ -395,8 +393,7 @@ def test_runforever_poll_dispatchers(self):
395393

396394
def test_runforever_select_dispatcher_exitnow(self):
397395
options = DummyOptions()
398-
poller = DummyPoller(options)
399-
poller.result = [6], []
396+
options.poller.result = [6], []
400397
supervisord = self._makeOne(options)
401398
pconfig = DummyPConfig(options, 'foo', '/bin/foo',)
402399
process = DummyProcess(pconfig)
@@ -406,7 +403,6 @@ def test_runforever_select_dispatcher_exitnow(self):
406403
exitnow = DummyDispatcher(readable=True, error=asyncore.ExitNow)
407404
pgroup.dispatchers = {6:exitnow}
408405
supervisord.process_groups = {'foo': pgroup}
409-
supervisord.poller = poller
410406
options.test = True
411407
self.assertRaises(asyncore.ExitNow, supervisord.runforever)
412408

@@ -507,19 +503,6 @@ def callback(event):
507503
self.assertEqual(len(L), 6)
508504
self.assertEqual(L[-1].__class__, events.Tick3600Event)
509505

510-
class DummyPoller:
511-
def __init__(self, options):
512-
self.result = []
513-
514-
def register_readable(self, fd):
515-
pass
516-
517-
def register_writable(self, fd):
518-
pass
519-
520-
def poll(self, timeout):
521-
return self.result
522-
523506
def test_suite():
524507
return unittest.findTestCases(sys.modules[__name__])
525508

0 commit comments

Comments
 (0)