Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c831c5b

Browse files
authored
Merge pull request #5597 from matrix-org/erikj/admin_api_cmd
Create basic admin command app
2 parents 18c5166 + 5ed7853 commit c831c5b

File tree

8 files changed

+357
-13
lines changed

8 files changed

+357
-13
lines changed

changelog.d/5597.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a basic admin command app to allow server operators to run Synapse admin commands separately from the main production instance.

synapse/app/_base.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def register_sighup(func):
4848
_sighup_callbacks.append(func)
4949

5050

51-
def start_worker_reactor(appname, config):
51+
def start_worker_reactor(appname, config, run_command=reactor.run):
5252
""" Run the reactor in the main process
5353
5454
Daemonizes if necessary, and then configures some resources, before starting
@@ -57,6 +57,7 @@ def start_worker_reactor(appname, config):
5757
Args:
5858
appname (str): application name which will be sent to syslog
5959
config (synapse.config.Config): config object
60+
run_command (Callable[]): callable that actually runs the reactor
6061
"""
6162

6263
logger = logging.getLogger(config.worker_app)
@@ -69,11 +70,19 @@ def start_worker_reactor(appname, config):
6970
daemonize=config.worker_daemonize,
7071
print_pidfile=config.print_pidfile,
7172
logger=logger,
73+
run_command=run_command,
7274
)
7375

7476

7577
def start_reactor(
76-
appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
78+
appname,
79+
soft_file_limit,
80+
gc_thresholds,
81+
pid_file,
82+
daemonize,
83+
print_pidfile,
84+
logger,
85+
run_command=reactor.run,
7786
):
7887
""" Run the reactor in the main process
7988
@@ -88,6 +97,7 @@ def start_reactor(
8897
daemonize (bool): true to run the reactor in a background process
8998
print_pidfile (bool): whether to print the pid file, if daemonize is True
9099
logger (logging.Logger): logger instance to pass to Daemonize
100+
run_command (Callable[]): callable that actually runs the reactor
91101
"""
92102

93103
install_dns_limiter(reactor)
@@ -97,7 +107,7 @@ def run():
97107
change_resource_limit(soft_file_limit)
98108
if gc_thresholds:
99109
gc.set_threshold(*gc_thresholds)
100-
reactor.run()
110+
run_command()
101111

102112
# make sure that we run the reactor with the sentinel log context,
103113
# otherwise other PreserveLoggingContext instances will get confused

synapse/app/admin_cmd.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
# Copyright 2019 Matrix.org Foundation C.I.C.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
import argparse
17+
import logging
18+
import os
19+
import sys
20+
import tempfile
21+
22+
from canonicaljson import json
23+
24+
from twisted.internet import defer, task
25+
26+
import synapse
27+
from synapse.app import _base
28+
from synapse.config._base import ConfigError
29+
from synapse.config.homeserver import HomeServerConfig
30+
from synapse.config.logger import setup_logging
31+
from synapse.handlers.admin import ExfiltrationWriter
32+
from synapse.replication.slave.storage._base import BaseSlavedStore
33+
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
34+
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
35+
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
36+
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
37+
from synapse.replication.slave.storage.devices import SlavedDeviceStore
38+
from synapse.replication.slave.storage.events import SlavedEventStore
39+
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
40+
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
41+
from synapse.replication.slave.storage.presence import SlavedPresenceStore
42+
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
43+
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
44+
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
45+
from synapse.replication.slave.storage.room import RoomStore
46+
from synapse.replication.tcp.client import ReplicationClientHandler
47+
from synapse.server import HomeServer
48+
from synapse.storage.engines import create_engine
49+
from synapse.util.logcontext import LoggingContext
50+
from synapse.util.versionstring import get_version_string
51+
52+
logger = logging.getLogger("synapse.app.admin_cmd")
53+
54+
55+
class AdminCmdSlavedStore(
56+
SlavedReceiptsStore,
57+
SlavedAccountDataStore,
58+
SlavedApplicationServiceStore,
59+
SlavedRegistrationStore,
60+
SlavedFilteringStore,
61+
SlavedPresenceStore,
62+
SlavedGroupServerStore,
63+
SlavedDeviceInboxStore,
64+
SlavedDeviceStore,
65+
SlavedPushRuleStore,
66+
SlavedEventStore,
67+
SlavedClientIpStore,
68+
RoomStore,
69+
BaseSlavedStore,
70+
):
71+
pass
72+
73+
74+
class AdminCmdServer(HomeServer):
75+
DATASTORE_CLASS = AdminCmdSlavedStore
76+
77+
def _listen_http(self, listener_config):
78+
pass
79+
80+
def start_listening(self, listeners):
81+
pass
82+
83+
def build_tcp_replication(self):
84+
return AdminCmdReplicationHandler(self)
85+
86+
87+
class AdminCmdReplicationHandler(ReplicationClientHandler):
88+
@defer.inlineCallbacks
89+
def on_rdata(self, stream_name, token, rows):
90+
pass
91+
92+
def get_streams_to_replicate(self):
93+
return {}
94+
95+
96+
@defer.inlineCallbacks
97+
def export_data_command(hs, args):
98+
"""Export data for a user.
99+
100+
Args:
101+
hs (HomeServer)
102+
args (argparse.Namespace)
103+
"""
104+
105+
user_id = args.user_id
106+
directory = args.output_directory
107+
108+
res = yield hs.get_handlers().admin_handler.export_user_data(
109+
user_id, FileExfiltrationWriter(user_id, directory=directory)
110+
)
111+
print(res)
112+
113+
114+
class FileExfiltrationWriter(ExfiltrationWriter):
115+
"""An ExfiltrationWriter that writes the users data to a directory.
116+
Returns the directory location on completion.
117+
118+
Note: This writes to disk on the main reactor thread.
119+
120+
Args:
121+
user_id (str): The user whose data is being exfiltrated.
122+
directory (str|None): The directory to write the data to, if None then
123+
will write to a temporary directory.
124+
"""
125+
126+
def __init__(self, user_id, directory=None):
127+
self.user_id = user_id
128+
129+
if directory:
130+
self.base_directory = directory
131+
else:
132+
self.base_directory = tempfile.mkdtemp(
133+
prefix="synapse-exfiltrate__%s__" % (user_id,)
134+
)
135+
136+
os.makedirs(self.base_directory, exist_ok=True)
137+
if list(os.listdir(self.base_directory)):
138+
raise Exception("Directory must be empty")
139+
140+
def write_events(self, room_id, events):
141+
room_directory = os.path.join(self.base_directory, "rooms", room_id)
142+
os.makedirs(room_directory, exist_ok=True)
143+
events_file = os.path.join(room_directory, "events")
144+
145+
with open(events_file, "a") as f:
146+
for event in events:
147+
print(json.dumps(event.get_pdu_json()), file=f)
148+
149+
def write_state(self, room_id, event_id, state):
150+
room_directory = os.path.join(self.base_directory, "rooms", room_id)
151+
state_directory = os.path.join(room_directory, "state")
152+
os.makedirs(state_directory, exist_ok=True)
153+
154+
event_file = os.path.join(state_directory, event_id)
155+
156+
with open(event_file, "a") as f:
157+
for event in state.values():
158+
print(json.dumps(event.get_pdu_json()), file=f)
159+
160+
def write_invite(self, room_id, event, state):
161+
self.write_events(room_id, [event])
162+
163+
# We write the invite state somewhere else as they aren't full events
164+
# and are only a subset of the state at the event.
165+
room_directory = os.path.join(self.base_directory, "rooms", room_id)
166+
os.makedirs(room_directory, exist_ok=True)
167+
168+
invite_state = os.path.join(room_directory, "invite_state")
169+
170+
with open(invite_state, "a") as f:
171+
for event in state.values():
172+
print(json.dumps(event), file=f)
173+
174+
def finished(self):
175+
return self.base_directory
176+
177+
178+
def start(config_options):
179+
parser = argparse.ArgumentParser(description="Synapse Admin Command")
180+
HomeServerConfig.add_arguments_to_parser(parser)
181+
182+
subparser = parser.add_subparsers(
183+
title="Admin Commands",
184+
required=True,
185+
dest="command",
186+
metavar="<admin_command>",
187+
help="The admin command to perform.",
188+
)
189+
export_data_parser = subparser.add_parser(
190+
"export-data", help="Export all data for a user"
191+
)
192+
export_data_parser.add_argument("user_id", help="User to extra data from")
193+
export_data_parser.add_argument(
194+
"--output-directory",
195+
action="store",
196+
metavar="DIRECTORY",
197+
required=False,
198+
help="The directory to store the exported data in. Must be empty. Defaults"
199+
" to creating a temp directory.",
200+
)
201+
export_data_parser.set_defaults(func=export_data_command)
202+
203+
try:
204+
config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
205+
except ConfigError as e:
206+
sys.stderr.write("\n" + str(e) + "\n")
207+
sys.exit(1)
208+
209+
if config.worker_app is not None:
210+
assert config.worker_app == "synapse.app.admin_cmd"
211+
212+
# Update the config with some basic overrides so that don't have to specify
213+
# a full worker config.
214+
config.worker_app = "synapse.app.admin_cmd"
215+
216+
if (
217+
not config.worker_daemonize
218+
and not config.worker_log_file
219+
and not config.worker_log_config
220+
):
221+
# Since we're meant to be run as a "command" let's not redirect stdio
222+
# unless we've actually set log config.
223+
config.no_redirect_stdio = True
224+
225+
# Explicitly disable background processes
226+
config.update_user_directory = False
227+
config.start_pushers = False
228+
config.send_federation = False
229+
230+
setup_logging(config, use_worker_options=True)
231+
232+
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
233+
234+
database_engine = create_engine(config.database_config)
235+
236+
ss = AdminCmdServer(
237+
config.server_name,
238+
db_config=config.database_config,
239+
config=config,
240+
version_string="Synapse/" + get_version_string(synapse),
241+
database_engine=database_engine,
242+
)
243+
244+
ss.setup()
245+
246+
# We use task.react as the basic run command as it correctly handles tearing
247+
# down the reactor when the deferreds resolve and setting the return value.
248+
# We also make sure that `_base.start` gets run before we actually run the
249+
# command.
250+
251+
@defer.inlineCallbacks
252+
def run(_reactor):
253+
with LoggingContext("command"):
254+
yield _base.start(ss, [])
255+
yield args.func(ss, args)
256+
257+
_base.start_worker_reactor(
258+
"synapse-admin-cmd", config, run_command=lambda: task.react(run)
259+
)
260+
261+
262+
if __name__ == "__main__":
263+
with LoggingContext("main"):
264+
start(sys.argv[1:])

0 commit comments

Comments
 (0)