Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 23 additions & 87 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
# limitations under the License.

import logging
import os
import tempfile

from canonicaljson import json

from twisted.internet import defer

Expand Down Expand Up @@ -99,15 +95,16 @@ def search_users(self, term):
defer.returnValue(ret)

@defer.inlineCallbacks
def exfiltrate_user_data(self, user_id, writer):
"""Write all data we have of the user to the specified directory.
def export_user_data(self, user_id, writer):
"""Write all data we have on the user to the given writer.

Args:
user_id (str)
writer (ExfiltrationWriter)

Returns:
defer.Deferred
defer.Deferred: Resolves when all data for a user has been written.
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = yield self.store.get_rooms_for_user_where_membership_is(
Expand All @@ -134,7 +131,7 @@ def exfiltrate_user_data(self, user_id, writer):

forgotten = yield self.store.did_forget(user_id, room_id)
if forgotten:
logger.info("[%s] User forgot room %d, ignoring", room_id)
logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
continue

if room_id not in rooms_user_has_been_in:
Expand Down Expand Up @@ -172,13 +169,14 @@ def exfiltrate_user_data(self, user_id, writer):
# dict[str, set[str]].
event_to_unseen_prevs = {}

# The reverse mapping to above, i.e. map from unseen event to parent
# events. dict[str, set[str]]
unseen_event_to_parents = {}
# The reverse mapping to above, i.e. map from unseen event to events
# that have the unseen event in their prev_events, i.e. the unseen
# events "children". dict[str, set[str]]
unseen_to_child_events = {}

# We fetch events in the room the user could see by fetching *all*
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarentee we get everything.
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = yield self.store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction="f"
Expand All @@ -200,16 +198,14 @@ def exfiltrate_user_data(self, user_id, writer):
if unseen_events:
event_to_unseen_prevs[event.event_id] = unseen_events
for unseen in unseen_events:
unseen_event_to_parents.setdefault(unseen, set()).add(
unseen_to_child_events.setdefault(unseen, set()).add(
event.event_id
)

# Now check if this event is an unseen prev event, if so
# then we remove this event from the appropriate dicts.
for event_id in unseen_event_to_parents.pop(event.event_id, []):
event_to_unseen_prevs.get(event_id, set()).discard(
event.event_id
)
for child_id in unseen_to_child_events.pop(event.event_id, []):
event_to_unseen_prevs[child_id].discard(event.event_id)

written_events.add(event.event_id)

Expand All @@ -233,7 +229,7 @@ def exfiltrate_user_data(self, user_id, writer):


class ExfiltrationWriter(object):
"""Interfaced used to specify how to write exfilrated data.
"""Interface used to specify how to write exported data.
"""

def write_events(self, room_id, events):
Expand All @@ -254,7 +250,7 @@ def write_state(self, room_id, event_id, state):
Args:
room_id (str)
event_id (str)
state (list[FrozenEvent])
state (dict[tuple[str, str], FrozenEvent])
"""
pass

Expand All @@ -263,76 +259,16 @@ def write_invite(self, room_id, event, state):

Args:
room_id (str)
invite (FrozenEvent)
state (list[dict]): A subset of the state at the invite, with a
subset of the event keys (type, state_key, content and sender)
event (FrozenEvent)
state (dict[tuple[str, str], dict]): A subset of the state at the
invite, with a subset of the event keys (type, state_key
content and sender)
"""

def finished(self):
"""Called when exfiltration is complete, and the return valus is passed
to the requester.
"""Called when all data has succesfully been exported and written.

This functions return value is passed to the caller of
`export_user_data`.
"""
pass


class FileExfiltrationWriter(ExfiltrationWriter):
"""An ExfiltrationWriter that writes the users data to a directory.

Returns the directory location on completion.

Args:
user_id (str): The user whose data is being exfiltrated.
directory (str|None): The directory to write the data to, if None then
will write to a temporary directory.
"""

def __init__(self, user_id, directory=None):
self.user_id = user_id

if directory:
self.base_directory = directory
else:
self.base_directory = tempfile.mkdtemp(
prefix="synapse-exfiltrate__%s__" % (user_id,)
)

os.makedirs(self.base_directory, exist_ok=True)
if list(os.listdir(self.base_directory)):
raise Exception("Directory must be empty")

def write_events(self, room_id, events):
room_directory = os.path.join(self.base_directory, "rooms", room_id)
os.makedirs(room_directory, exist_ok=True)
events_file = os.path.join(room_directory, "events")

with open(events_file, "a") as f:
for event in events:
print(json.dumps(event.get_pdu_json()), file=f)

def write_state(self, room_id, event_id, state):
room_directory = os.path.join(self.base_directory, "rooms", room_id)
state_directory = os.path.join(room_directory, "state")
os.makedirs(state_directory, exist_ok=True)

event_file = os.path.join(state_directory, event_id)

with open(event_file, "a") as f:
for event in state.values():
print(json.dumps(event.get_pdu_json()), file=f)

def write_invite(self, room_id, event, state):
self.write_events(room_id, [event])

# We write the invite state somewhere else as they aren't full events
# and are only a subset of the state at the event.
room_directory = os.path.join(self.base_directory, "rooms", room_id)
os.makedirs(room_directory, exist_ok=True)

invite_state = os.path.join(room_directory, "invite_state")

with open(invite_state, "a") as f:
for event in state.values():
print(json.dumps(event), file=f)

def finished(self):
return self.base_directory
16 changes: 10 additions & 6 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,9 @@ def _paginate_room_events_txn(
Returns:
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
as a list of _EventDictReturn and a token that points to the end
of the result set.
of the result set. If no events are returned then the end of the
stream has been reached (i.e. there are no events between
`from_token` and `to_token`).
"""

assert int(limit) >= 0
Expand Down Expand Up @@ -905,15 +907,17 @@ def paginate_room_events(
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
limit (int): The maximum number of events to return.
event_filter (Filter|None): If provided filters the events to
those that match the filter.

Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "topological_ordering" and "stream_orderign".
tuple[list[FrozenEvents], str]: Returns the results as a list of
dicts and a token that points to the end of the result set. The
dicts have the keys "event_id", "topological_ordering" and
"stream_ordering". If no events are returned then the end of the
stream has been reached (i.e. there are no events between
`from_key` and `to_key`).
"""

from_key = RoomStreamToken.parse(from_key)
Expand Down
10 changes: 5 additions & 5 deletions tests/handlers/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_single_public_joined_room(self):

writer = Mock()

self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer))
self.get_success(self.admin_handler.export_user_data(self.user2, writer))

writer.write_events.assert_called()

Expand Down Expand Up @@ -94,7 +94,7 @@ def test_single_private_joined_room(self):

writer = Mock()

self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer))
self.get_success(self.admin_handler.export_user_data(self.user2, writer))

writer.write_events.assert_called()

Expand Down Expand Up @@ -127,7 +127,7 @@ def test_single_left_room(self):

writer = Mock()

self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer))
self.get_success(self.admin_handler.export_user_data(self.user2, writer))

writer.write_events.assert_called()

Expand Down Expand Up @@ -169,7 +169,7 @@ def test_single_left_rejoined_private_room(self):

writer = Mock()

self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer))
self.get_success(self.admin_handler.export_user_data(self.user2, writer))

writer.write_events.assert_called_once()

Expand Down Expand Up @@ -198,7 +198,7 @@ def test_invite(self):

writer = Mock()

self.get_success(self.admin_handler.exfiltrate_user_data(self.user2, writer))
self.get_success(self.admin_handler.export_user_data(self.user2, writer))

writer.write_events.assert_not_called()
writer.write_state.assert_not_called()
Expand Down