Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6f75e51
Implement feature to search jobs with a sp- and doc-integrated filter.
csadorf May 19, 2019
531c07e
A JobsCursor always uses an integrated filter.
csadorf May 19, 2019
93a893d
Extend unit tests to cover integrated and mixed filters.
csadorf May 19, 2019
d5a55c4
Fix bugs with respect to mixed and integrated filters.
csadorf May 19, 2019
99a5847
Fix bug in unit test implementation.
csadorf May 19, 2019
9860674
merge master
vishav1771 Apr 27, 2020
5eb7c75
cleanup and updated tests
vishav1771 Apr 29, 2020
62a7a75
error
vishav1771 Apr 29, 2020
93c237e
linting
vishav1771 Apr 29, 2020
7c038ac
changes
vishav1771 Apr 29, 2020
6c1228f
changes
vishav1771 Apr 29, 2020
8fb9bd0
error
vishav1771 Apr 29, 2020
0987153
Changed groupby
vishav1771 May 10, 2020
b4df5ea
Merge master
vishav1771 May 10, 2020
007afcd
Updated doc-string of groupby
vishav1771 May 10, 2020
70ce707
Changes
vishav1771 May 12, 2020
aec0e78
Added Comment to xfail
vishav1771 May 12, 2020
4a734eb
Linting Change
vishav1771 May 12, 2020
b275b5d
Reverting init change
vishav1771 May 12, 2020
28b88ef
changes
vishav1771 May 14, 2020
3643f72
init change
vishav1771 May 19, 2020
bae6744
Changes
vishav1771 May 21, 2020
3859368
Lint
vishav1771 May 21, 2020
f5e2fc1
Merge branch 'master' into feature/integrated-queries
csadorf May 25, 2020
29c7c84
Merge remote-tracking branch 'origin/master' into feature/integrated-…
vyasr Feb 2, 2021
6ba435d
Address PR comments, fix a bug found in the process, remove docstring…
vyasr Feb 10, 2021
2f6ca87
Merge branch 'master' into feature/integrated-queries
vyasr Feb 10, 2021
f5eb165
add a test for regex
mikemhenry Feb 10, 2021
9870ce5
Add a few more tests of basic behavior.
vyasr Feb 10, 2021
8f9504e
Update changelog.
vyasr Feb 10, 2021
b81817c
Update changelog.
vyasr Feb 10, 2021
0941688
Fix typo in changelog.
vyasr Feb 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 59 additions & 14 deletions signac/contrib/filterparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ def _cast(x):
return x


def _parse_simple(key, value=None):
def _parse_single(key, value=None):
if value is None or value == '!':
return {key: {'$exists': True}}
return key, {'$exists': True}
elif _is_json(value):
return {key: _parse_json(value)}
return key, _parse_json(value)
elif _is_regex(value):
return {key: {'$regex': value[1:-1]}}
return key, {'$regex': value[1:-1]}
elif _is_json(key):
raise ValueError(
"Please check your filter arguments. "
"Using as JSON expression as key is not allowed: '{}'.".format(key))
else:
return {key: _cast(value)}
return key, _cast(value)


def parse_filter_arg(args, file=sys.stderr):
Expand All @@ -90,14 +90,59 @@ def parse_filter_arg(args, file=sys.stderr):
if _is_json(args[0]):
return _parse_json(args[0])
else:
return _with_message(_parse_simple(args[0]), file)
key, value = _parse_single(args[0])
return _with_message({key: value}, file)
else:
q = dict()
for i in range(0, len(args), 2):
key = args[i]
if i+1 < len(args):
value = args[i+1]
else:
value = None
q.update(_parse_simple(key, value))
q = dict(parse_simple(args))

return _with_message(q, file)


def parse_simple(tokens):
for i in range(0, len(tokens), 2):
key = tokens[i]
if i+1 < len(tokens):
value = tokens[i+1]
else:
value = None
yield _parse_single(key, value)


def _add_prefix(filter, prefix):
for key, value in filter:
if key in ('$and', '$or'):
if isinstance(value, list) or isinstance(value, tuple):
yield key, [dict(_add_prefix(item.items(), prefix)) for item in value]
else:
raise ValueError(
"The argument to a logical operator must be a sequence (e.g. a list)!")
elif '.' in key and key.split('.', 1)[0] in ('sp', 'doc'):
yield key, value
elif key in ('sp', 'doc'):
yield key, value
else:
yield prefix + '.' + key, value


def _root_keys(filter):
for key, value in filter.items():
if key in ('$and', '$or'):
assert isinstance(value, (list, tuple))
for item in value:
for key in _root_keys(item):
yield key
elif '.' in key:
yield key.split('.', 1)[0]
else:
yield key


def _parse_filter(filter):
if isinstance(filter, str):
yield from parse_simple(filter.split())
elif filter:
yield from filter.items()


def parse_filter(filter, prefix='sp'):
yield from _add_prefix(_parse_filter(filter), prefix)
2 changes: 1 addition & 1 deletion signac/contrib/import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _make_schema_based_path_function(jobs, exclude_keys=None, delimiter_nested='
# signature of the path function below.
return lambda job, sep=None: ''

index = [{'_id': job._id, 'statepoint': job.sp()} for job in jobs]
index = [{'_id': job._id, 'sp': job.sp()} for job in jobs]
jsi = _build_job_statepoint_index(jobs=jobs, exclude_const=True, index=index)
sp_index = OrderedDict(jsi)

Expand Down
4 changes: 2 additions & 2 deletions signac/contrib/linked_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ def create_linked_view(project, prefix=None, job_ids=None, index=None, path=None

if index is None:
if job_ids is None:
index = [{'_id': job._id, 'statepoint': job.sp()} for job in project]
index = [{'_id': job._id, 'sp': job.sp()} for job in project]
jobs = list(project)
else:
index = [{'_id': job_id, 'statepoint': project.open_job(id=job_id).sp()}
index = [{'_id': job_id, 'sp': project.open_job(id=job_id).sp()}
for job_id in job_ids]
jobs = list(project.open_job(id=job_id) for job_id in job_ids)
elif job_ids is not None:
Expand Down
133 changes: 89 additions & 44 deletions signac/contrib/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .errors import WorkspaceError
from .errors import DestinationExistsError
from .errors import JobsCorruptedError
from .filterparse import parse_filter, _root_keys
from .errors import IncompatibleSchemaVersion

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,17 +77,23 @@ def __init__(self, index, _trust=False):
def __len__(self):
return len(self._collection)

def _resolve_statepoint_filter(self, q):
for k, v in q.items():
if k in ('$and', '$or'):
if not isinstance(v, list) or isinstance(v, tuple):
raise ValueError(
"The argument to a logical operator must be a sequence (e.g. a list)!")
yield k, [dict(self._resolve_statepoint_filter(i)) for i in v]
else:
yield 'statepoint.{}'.format(k), v

def find_job_ids(self, filter=None, doc_filter=None):
"""Find the job_ids of all jobs matching the filters.

The optional filter arguments must be a Mapping of key-value
pairs and JSON serializable.

:param filter: A mapping of key-value pairs that all
indexed job statepoints are compared against.
:type filter: Mapping
:param doc_filter: A mapping of key-value pairs that all
indexed job documents are compared against.
:yields: The ids of all indexed jobs matching both filters.
:raise TypeError: If the filters are not JSON serializable.
:raises ValueError: If the filters are invalid.
:raises RuntimeError: If the filters are not supported
by the index.
"""
if filter:
filter = dict(self._resolve_statepoint_filter(filter))
if doc_filter:
Expand Down Expand Up @@ -550,10 +557,9 @@ def build_job_statepoint_index(self, exclude_const=False, index=None):
"""
from .schema import _build_job_statepoint_index
if index is None:
index = [{'_id': job._id, 'statepoint': job.sp()} for job in self]
for x, y in _build_job_statepoint_index(
jobs=self, exclude_const=exclude_const, index=index):
yield tuple(x.split('.')), y
index = [{'_id': job._id, 'sp': job.sp()} for job in self]
for x in _build_job_statepoint_index(jobs=self, exclude_const=exclude_const, index=index):
yield tuple(x.split('.'))

def detect_schema(self, exclude_const=False, subset=None, index=None):
"""Detect the project's state point schema.
Expand Down Expand Up @@ -612,14 +618,15 @@ def _find_job_ids(self, filter=None, doc_filter=None, index=None):
if filter is None and doc_filter is None and index is None:
return list(self._job_dirs())
if index is None:
if doc_filter is None:
index = self._sp_index()
else:
filter = dict(parse_filter(filter, 'sp'))
if doc_filter:
filter.update(parse_filter(doc_filter, 'doc'))
index = self.index(include_job_document=True)
search_index = JobSearchIndex(index, _trust=True)
else:
search_index = JobSearchIndex(index)
return search_index.find_job_ids(filter=filter, doc_filter=doc_filter)
elif 'doc' in _root_keys(filter):
index = self.index(include_job_document=True)
else:
index = self._sp_index()
return Collection(index, _trust=True)._find(filter)

def find_jobs(self, filter=None, doc_filter=None):
"""Find all jobs in the project's workspace.
Expand All @@ -641,7 +648,10 @@ def find_jobs(self, filter=None, doc_filter=None):
:raises RuntimeError: If the filters are not supported
by the index.
"""
return JobsCursor(self, filter, doc_filter)
filter = dict(parse_filter(filter, 'sp'))
if doc_filter:
filter.update(parse_filter(doc_filter, 'doc'))
return JobsCursor(self, filter)

def __iter__(self):
return iter(self.find_jobs())
Expand All @@ -658,6 +668,14 @@ def groupby(self, key=None, default=None):
for key, group in project.groupby('a'):
print(key, list(group))

# Group jobs by document value 'a'.
for key, group in project.groupby('doc.a'):
print(key, list(group))

# Group jobs by jobs.sp['a'] and job.document['b']
for key, group in project.groupby('a', 'doc.b'):
print(key, list(group))

# Find jobs where job.sp['a'] is 1 and group them
# by job.sp['b'] and job.sp['c'].
for key, group in project.find_jobs({'a': 1}).groupby(('b', 'c')):
Expand Down Expand Up @@ -1228,7 +1246,7 @@ def repair(self, fn_statepoints=None, index=None, job_ids=None):
raise
if index is not None:
for doc in index:
self._sp_cache[doc['signac_id']] = doc['statepoint']
self._sp_cache[doc['signac_id']] = doc['sp']

corrupted = []
for job_id in job_ids:
Expand Down Expand Up @@ -1286,7 +1304,7 @@ def _sp_index(self):
for _id in to_remove:
del self._index_cache[_id]
for _id in to_add:
self._index_cache[_id] = dict(statepoint=self._get_statepoint(_id), _id=_id)
self._index_cache[_id] = dict(sp=self._get_statepoint(_id), _id=_id)
return self._index_cache.values()

def _build_index(self, include_job_document=False):
Expand All @@ -1295,14 +1313,14 @@ def _build_index(self, include_job_document=False):
"""
wd = self.workspace() if self.Job is Job else None
for _id in self._find_job_ids():
doc = dict(_id=_id, statepoint=self._get_statepoint(_id))
doc = dict(_id=_id, sp=self._get_statepoint(_id))
if include_job_document:
if wd is None:
doc.update(self.open_job(id=_id).document)
doc['doc'] = self.open_job(id=_id).document
else: # use optimized path
try:
with open(os.path.join(wd, _id, self.Job.FN_DOCUMENT), 'rb') as file:
doc.update(json.loads(file.read().decode()))
doc['doc'] = json.loads(file.read().decode())
except IOError as error:
if error.errno != errno.ENOENT:
raise
Expand Down Expand Up @@ -1695,25 +1713,23 @@ class JobsCursor(object):
"""
_use_pandas_for_html_repr = True # toggle use of pandas for html repr

def __init__(self, project, filter, doc_filter):
def __init__(self, project, filter):
self._project = project
self._filter = filter
self._doc_filter = doc_filter

# This private attribute allows us to implement the deprecated
# next() method for this class.
self._next_iter = None

def __eq__(self, other):
return self._project == other._project and self._filter == other._filter\
and self._doc_filter == other._doc_filter
return self._project == other._project and self._filter == other._filter

def __len__(self):
# Highly performance critical code path!!
if self._filter or self._doc_filter:
if self._filter:
# We use the standard function for determining job ids if and only if
# any of the two filter is provided.
return len(self._project._find_job_ids(self._filter, self._doc_filter))
return len(self._project._find_job_ids(self._filter))
else:
# Without filter we can simply return the length of the whole project.
return self._project.__len__()
Expand All @@ -1722,7 +1738,7 @@ def __iter__(self):
# Code duplication here for improved performance.
return _JobsCursorIterator(
self._project,
self._project._find_job_ids(self._filter, self._doc_filter),
self._project._find_job_ids(self._filter)
)

def next(self):
Expand Down Expand Up @@ -1753,6 +1769,14 @@ def groupby(self, key=None, default=None):
for key, group in project.groupby('a'):
print(key, list(group))

# Group jobs by document value 'a'.
for key, group in project.groupby('doc.a'):
print(key, list(group))

# Group jobs by jobs.sp['a'] and job.document['b']
for key, group in project.groupby('a', 'doc.b'):
print(key, list(group))

# Find jobs where job.sp['a'] is 1 and group them
# by job.sp['b'] and job.sp['c'].
for key, group in project.find_jobs({'a': 1}).groupby(('b', 'c')):
Expand Down Expand Up @@ -1784,23 +1808,45 @@ def groupby(self, key=None, default=None):
else:
_filter = {'$and': [{key: {"$exists": True}}, _filter]}

def keyfunction(job):
return job.sp[key]
if '.' in key and key.split('.', 1)[0] == 'doc':
def keyfunction(job):
return job.document[key[4:]]
else:
key = key[3:] if '.' in key and key.split('.', 1)[0] == 'sp' else key

def keyfunction(job):
return job.sp[key]
else:
def keyfunction(job):
return job.sp.get(key, default)
if '.' in key and key.split('.', 1)[0] == 'doc':
def keyfunction(job):
return job.document.get(key, default)
else:
key = key[3:] if '.' in key and key.split('.', 1)[0] == 'sp' else key

def keyfunction(job):
return job.sp.get(key, default)

elif isinstance(key, Iterable):
sp_keys = []
doc_keys = []
for k in key:
if '.' in k and k.split('.', 1)[0] == 'doc':
doc_keys.append(k[4:])
else:
sp_keys.append(k[3:] if '.' in k and k.split('.', 1)[0] == 'sp' else k)

if default is None:
if _filter is None:
_filter = {k: {"$exists": True} for k in key}
else:
_filter = {'$and': [{k: {"$exists": True} for k in key}, _filter]}

def keyfunction(job):
return tuple(job.sp[k] for k in key)
return tuple([job.sp[k] for k in sp_keys] + [job.document[k] for k in doc_keys])
else:
def keyfunction(job):
return tuple(job.sp.get(k, default) for k in key)
return tuple([job.sp.get(k, default) for k in sp_keys] +
[job.document.get(k, default) for k in doc_keys])
elif key is None:
# Must return a type that can be ordered with <, >
def keyfunction(job):
Expand All @@ -1809,7 +1855,7 @@ def keyfunction(job):
# Pass the job document to a callable
keyfunction = key

return groupby(sorted(iter(JobsCursor(self._project, _filter, self._doc_filter)),
return groupby(sorted(iter(JobsCursor(self._project, _filter)),
key=keyfunction), key=keyfunction)

def groupbydoc(self, key=None, default=None):
Expand Down Expand Up @@ -1910,11 +1956,10 @@ def _export_sp_and_doc(job):
orient='index').infer_objects()

def __repr__(self):
return '{type}(project={project}, filter={filter}, doc_filter={doc_filter})'.format(
return '{type}(project={project}, filter={filter})'.format(
type=self.__class__.__name__,
project=repr(self._project),
filter=repr(self._filter),
doc_filter=repr(self._doc_filter))
filter=repr(self._filter))

def _repr_html_jobs(self):
html = ''
Expand Down
Loading