Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 42 additions & 43 deletions ml-agents/mlagents/trainers/demo_loader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pathlib
import logging
import os
from typing import List, Tuple
Expand Down Expand Up @@ -28,16 +27,12 @@ def make_demo_buffer(
# Create and populate buffer using experiences
demo_raw_buffer = AgentBuffer()
demo_processed_buffer = AgentBuffer()
for idx, current_pair_info in enumerate(pair_infos):
if idx > len(pair_infos) - 2:
break
next_pair_info = pair_infos[idx + 1]
current_brain_info = BrainInfo.from_agent_proto(
0, [current_pair_info.agent_info], brain_params
)
next_brain_info = BrainInfo.from_agent_proto(
0, [next_pair_info.agent_info], brain_params
)
brain_infos = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried that this will eat up a lot of memory for large demo files, since it will decompress everything at once (in the original issue, demos were 1GB and that includes compressed visual observations). I think a better approach would be something like (very rough sketch):

current_brain_info = BrainInfo.from_agent_proto(0, [pair_infos[0].agent_info], brain_params)  # TODO handle empty pair_infos
for idx, current_pair_info in enumerate(pair_infos):
  # (dont calculate current_brain_info)
  # (rest of the loop stays the same)
  current_brain_info = next_brain_info

Does that sounds reasonable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, great suggestion! This way, the entire demonstration file doesn't need to be in memory at once. I want to write a test for the demonstration loading that verifies that the loading is still being done correctly. My idea is to take the old implementation, load the demo files in test_demo_dir with demo_to_buffer and then pickle the returned objects and save them somewhere as the "correct answer." Then, in test_demo_loader.py, load in the pickled objects and compare them against what the current implementation of demo_to_buffer is returning.

The problem is that the AgentBuffer and BrainParameters classes don't have defined __eq__ methods yet. Any ideas? Should I go through with the test or not? If so, what should my next step be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way, the entire demonstration file doesn't need to be in memory at once.

The way I proposed still has the protobuf objects loaded all at once (this is the current behavior) but it at least doesn't increase the peak. Adding things to the buffer as soon as they're read from the file is another option, but could probably wait until another PR.

Instead of saving pickled data, how about writing new demonstration files and comparing the values? We don't currently have a python implementation of this, but I think it would be useful to have anyway.

You can see how this is done in an old gist that I wrote: https://gist.github.com/chriselion/3714d05255eea2f9132b96a182fbdcaa#file-convert_demo-py-L101-L115
(the structures being written might have changed a bit, but the overall format is the same).

So the test would be something like

  1. Create new BrainParameters and AgentInfo protos with specific values
  2. Save as a new demonstration to a temp directory (or temp file)
  3. Load and compare to the original values

Adding __eq__ to BrainParameters sounds fine if you think it would help. I'm not sure it's necessary for AgentBuffer since that inherits from dict (whether or not it should is another discussion).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I quite get what you mean. Every iteration through the for loop (in your proposed implementation), don't we lose an object reference? In particular, the object that current_brain_info referred to before current_brain_info and next_brain_info got updated. After the reference to that object is lost, wouldn't Python garbage collect that data, and, so not all the demonstration file is in memory at once?

And, just so I'm clear, you're proposing to reconvert the loaded demonstration file contents back into a demonstration file (in a temporary directory) and then directly compare the file contents? I was just thinking about if we wanted to have tests for different sequence_lengths for the make_demo_buffer function. Wouldn't this approach only work for sequence_length = 1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay.

By "still has the protobuf objects loaded all at once", I meant the pair_infos list. If we wanted to reduce the total memory, we could combine load_demonstration and make_demo_buffer so that the make_demo_buffer() loop logic happens right after agent_info_action.ParseFromString(). But I don't think we should worry about this for now; I just don't want to increase the peak usage, but we don't need to try to decrease it.

For a test, I'm suggesting something like

  1. Make temporary directory
  2. Create BrainParameters and AgentInfos
  3. Write demo file using BrainParameters and AgentInfos to temp directory
  4. Run load_demonstration() and make_demo_buffer() to get the a buffer
  5. Make sure the result from 4 agrees with what should be produced from 2 (I'm not exactly sure what this looks like off the top of my head, and not quite sure about the sequence length)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A logistical note - there are some more changes to this surrounding code in the next week or so. To minimize the number of merge conflicts you're going to have to deal with, let's try to get the get_demo_files() and load_demonstration() changed merged, and come back to the make_demo_buffer / proto conversion change after.

Can you either undo the changes to make_demo_buffer(), or move them to another PR and I'll approve that? (the former is probably easier)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chriselion Sorry for the delay. School just started for me and it took a while to get myself settled in! I'll undo the changes now so that you can merge the get_demo_files() and load_demonstration() changes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. Let me know if you want to keep working on this, or want to try something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chriselion I really want to complete this first assignment, and I'll try to do so by the end of this week! But, that being said, I'd really appreciate it if you could start to onboard me to the next thing I could be working on at the same time. Much appreciated!

BrainInfo.from_agent_proto(0, [pair_info.agent_info], brain_params)
for pair_info in pair_infos
]
for idx in range(len(brain_infos) - 1):
current_brain_info, next_brain_info = brain_infos[idx : idx + 2]
previous_action = (
np.array(pair_infos[idx].action_info.vector_actions, dtype=np.float32) * 0
)
Expand All @@ -55,7 +50,7 @@ def make_demo_buffer(
demo_raw_buffer["vector_obs"].append(
current_brain_info.vector_observations[0]
)
demo_raw_buffer["actions"].append(current_pair_info.action_info.vector_actions)
demo_raw_buffer["actions"].append(pair_infos[idx].action_info.vector_actions)
demo_raw_buffer["prev_action"].append(previous_action)
if next_brain_info.local_done[0]:
demo_raw_buffer.resequence_and_append(
Expand Down Expand Up @@ -83,39 +78,43 @@ def demo_to_buffer(
return brain_params, demo_buffer


@timed
def load_demonstration(
file_path: str
) -> Tuple[BrainParameters, List[AgentInfoActionPairProto], int]:
"""
Loads and parses a demonstration file.
:param file_path: Location of demonstration file (.demo).
:return: BrainParameter and list of AgentInfoActionPairProto containing demonstration data.
def get_demo_files(path: str) -> List[str]:
"""
Retrieves the demonstration file(s) from a path.
:param path: Path of demonstration file or directory.
:return: List of demonstration files

# First 32 bytes of file dedicated to meta-data.
INITIAL_POS = 33
file_paths = []
if os.path.isdir(file_path):
all_files = os.listdir(file_path)
for _file in all_files:
if _file.endswith(".demo"):
file_paths.append(os.path.join(file_path, _file))
if not all_files:
Raises errors if |path| is invalid.
"""
if os.path.isfile(path):
if not path.endswith(".demo"):
raise ValueError("The path provided is not a '.demo' file.")
return [path]
elif os.path.isdir(path):
paths = [
os.path.join(path, name)
for name in os.listdir(path)
if name.endswith(".demo")
]
if not paths:
raise ValueError("There are no '.demo' files in the provided directory.")
elif os.path.isfile(file_path):
file_paths.append(file_path)
file_extension = pathlib.Path(file_path).suffix
if file_extension != ".demo":
raise ValueError(
"The file is not a '.demo' file. Please provide a file with the "
"correct extension."
)
return paths
else:
raise FileNotFoundError(
"The demonstration file or directory {} does not exist.".format(file_path)
f"The demonstration file or directory {path} does not exist."
)


@timed
def load_demonstration(
path: str,
) -> Tuple[BrainParameters, List[AgentInfoActionPairProto], int]:
"""
Loads and parses a demonstration file or directory.
:param path: File or directory.
:return: BrainParameter and list of AgentInfoActionPairProto containing demonstration data.
"""
file_paths = get_demo_files(path)
brain_params = None
brain_param_proto = None
info_action_pairs = []
Expand All @@ -131,12 +130,14 @@ def load_demonstration(
meta_data_proto = DemonstrationMetaProto()
meta_data_proto.ParseFromString(data[pos : pos + next_pos])
total_expected += meta_data_proto.number_steps
# first 32 bytes of file dedicated to metadata
INITIAL_POS = 33
pos = INITIAL_POS
if obs_decoded == 1:
elif obs_decoded == 1:
brain_param_proto = BrainParametersProto()
brain_param_proto.ParseFromString(data[pos : pos + next_pos])
pos += next_pos
if obs_decoded > 1:
else:
agent_info_action = AgentInfoActionPairProto()
agent_info_action.ParseFromString(data[pos : pos + next_pos])
if brain_params is None:
Expand All @@ -149,7 +150,5 @@ def load_demonstration(
pos += next_pos
obs_decoded += 1
if not brain_params:
raise RuntimeError(
f"No BrainParameters found in demonstration file at {file_path}."
)
raise RuntimeError(f"No BrainParameters found in demonstration file at {path}.")
return brain_params, info_action_pairs, total_expected
37 changes: 34 additions & 3 deletions ml-agents/mlagents/trainers/tests/test_demo_loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import os
import pytest
import tempfile

from mlagents.trainers.demo_loader import load_demonstration, demo_to_buffer
from mlagents.trainers.demo_loader import (
load_demonstration,
demo_to_buffer,
get_demo_files,
)

path_prefix = os.path.dirname(os.path.abspath(__file__))


def test_load_demo():
path_prefix = os.path.dirname(os.path.abspath(__file__))
brain_parameters, pair_infos, total_expected = load_demonstration(
path_prefix + "/test.demo"
)
Expand All @@ -17,7 +24,6 @@ def test_load_demo():


def test_load_demo_dir():
path_prefix = os.path.dirname(os.path.abspath(__file__))
brain_parameters, pair_infos, total_expected = load_demonstration(
path_prefix + "/test_demo_dir"
)
Expand All @@ -27,3 +33,28 @@ def test_load_demo_dir():

_, demo_buffer = demo_to_buffer(path_prefix + "/test_demo_dir", 1)
assert len(demo_buffer["actions"]) == total_expected - 1


def test_edge_cases():
# nonexistent file and directory
with pytest.raises(FileNotFoundError):
get_demo_files(os.path.join(path_prefix, "nonexistent_file.demo"))
with pytest.raises(FileNotFoundError):
get_demo_files(os.path.join(path_prefix, "nonexistent_directory"))
with tempfile.TemporaryDirectory() as tmpdirname:
# empty directory
with pytest.raises(ValueError):
get_demo_files(tmpdirname)
# invalid file
invalid_fname = tmpdirname + "/mydemo.notademo"
with open(invalid_fname, "w") as f:
f.write("I'm not a demo")
with pytest.raises(ValueError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also confirm that get_demo_files(tmpdirname) fails here too (since there is a file but it gets filtered out).

get_demo_files(invalid_fname)
# valid file
valid_fname = tmpdirname + "/mydemo.demo"
with open(valid_fname, "w") as f:
f.write("I'm a demo file")
assert get_demo_files(valid_fname) == [valid_fname]
# valid directory
assert get_demo_files(tmpdirname) == [valid_fname]