-
Notifications
You must be signed in to change notification settings - Fork 37
Implement feature to search jobs with a sp- and doc-integrated filter. #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6f75e51
531c07e
93a893d
d5a55c4
99a5847
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| from __future__ import print_function | ||
| import sys | ||
| from ..core import json | ||
| from ..common import six | ||
|
|
||
|
|
||
| def _print_err(msg=None): | ||
|
|
@@ -71,17 +72,17 @@ def _cast(x): | |
|
|
||
| def _parse_simple(key, value=None): | ||
| if value is None or value == '!': | ||
| return {key: {'$exists': True}} | ||
| return key, {'$exists': True} | ||
| elif _is_json(value): | ||
| return {key: _parse_json(value)} | ||
| return key, _parse_json(value) | ||
| elif _is_regex(value): | ||
| return {key: {'$regex': value[1:-1]}} | ||
| return key, {'$regex': value[1:-1]} | ||
| elif _is_json(key): | ||
| raise ValueError( | ||
| "Please check your filter arguments. " | ||
| "Using as JSON expression as key is not allowed: '{}'.".format(key)) | ||
| else: | ||
| return {key: _cast(value)} | ||
| return key, _cast(value) | ||
|
|
||
|
|
||
| def parse_filter_arg(args, file=sys.stderr): | ||
|
|
@@ -91,14 +92,65 @@ def parse_filter_arg(args, file=sys.stderr): | |
| if _is_json(args[0]): | ||
| return _parse_json(args[0]) | ||
| else: | ||
| return _with_message(_parse_simple(args[0]), file) | ||
| key, value = _parse_simple(args[0]) | ||
| return _with_message({key: value}, file) | ||
| else: | ||
| q = dict() | ||
| for i in range(0, len(args), 2): | ||
| key = args[i] | ||
| if i+1 < len(args): | ||
| value = args[i+1] | ||
| else: | ||
| value = None | ||
| q.update(_parse_simple(key, value)) | ||
| q = dict(parse_simple(args)) | ||
|
|
||
| return _with_message(q, file) | ||
|
|
||
|
|
||
| def parse_simple(tokens): | ||
| for i in range(0, len(tokens), 2): | ||
| key = tokens[i] | ||
| if i+1 < len(tokens): | ||
| value = tokens[i+1] | ||
| else: | ||
| value = None | ||
| yield _parse_simple(key, value) | ||
|
|
||
|
|
||
| def _add_prefix(filter, prefix): | ||
| for key, value in filter: | ||
| if key in ('$and', '$or'): | ||
| if isinstance(value, list) or isinstance(value, tuple): | ||
| yield key, [dict(_add_prefix(item.items(), prefix)) for item in value] | ||
| else: | ||
| raise ValueError( | ||
| "The argument to a logical operator must be a sequence (e.g. a list)!") | ||
| elif '.' in key and key.split('.', 1)[0] in ('sp', 'doc'): | ||
| yield key, value | ||
| elif key in ('sp', 'doc'): | ||
| yield key, value | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of this branch? On a related note, should we explicitly disallow
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For these cases, we need to ensure to not add 'sp' or 'doc' as prefixes, for instance with a filter that is expressed like this: Concerning your second point,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't consider that using |
||
| else: | ||
| yield prefix + '.' + key, value | ||
|
|
||
|
|
||
| def _root_keys(filter): | ||
| for key, value in filter.items(): | ||
| if key in ('$and', '$or'): | ||
| assert isinstance(value, (list, tuple)) | ||
| for item in value: | ||
| for key in _root_keys(item): | ||
| yield key | ||
| elif '.' in key: | ||
| yield key.split('.', 1)[0] | ||
| else: | ||
| yield key | ||
|
|
||
|
|
||
| def _parse_filter(filter): | ||
| if isinstance(filter, six.string_types): | ||
| # yield from parse_simple(filter.split()) # TODO: After dropping Py27. | ||
| for key, value in parse_simple(filter.split()): | ||
| yield key, value | ||
| elif filter: | ||
| # yield from filter.items() # TODO: After dropping Py27. | ||
| for key, value in filter.items(): | ||
| yield key, value | ||
|
|
||
|
|
||
| def parse_filter(filter, prefix='sp'): | ||
| # yield from _add_prefix(_parse_filter(filter), prefix) # TODO: After dropping Py27. | ||
| for key, value in _add_prefix(_parse_filter(filter), prefix): | ||
| yield key, value | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| from .errors import WorkspaceError | ||
| from .errors import DestinationExistsError | ||
| from .errors import JobsCorruptedError | ||
| from .filterparse import parse_filter, _root_keys | ||
| if six.PY2: | ||
| from collections import Mapping, Iterable | ||
| else: | ||
|
|
@@ -77,17 +78,7 @@ def __init__(self, index, _trust=False): | |
| def __len__(self): | ||
| return len(self._collection) | ||
|
|
||
| def _resolve_statepoint_filter(self, q): | ||
| for k, v in q.items(): | ||
| if k in ('$and', '$or'): | ||
| if not isinstance(v, list) or isinstance(v, tuple): | ||
| raise ValueError( | ||
| "The argument to a logical operator must be a sequence (e.g. a list)!") | ||
| yield k, [dict(self._resolve_statepoint_filter(i)) for i in v] | ||
| else: | ||
| yield 'statepoint.{}'.format(k), v | ||
|
|
||
| def find_job_ids(self, filter=None, doc_filter=None): | ||
| def find_job_ids(self, filter=None): | ||
| """Find the job_ids of all jobs matching the filters. | ||
|
|
||
| The optional filter arguments must be a Mapping of key-value | ||
|
|
@@ -104,12 +95,6 @@ def find_job_ids(self, filter=None, doc_filter=None): | |
| :raises RuntimeError: If the filters are not supported | ||
| by the index. | ||
| """ | ||
| if filter: | ||
| filter = dict(self._resolve_statepoint_filter(filter)) | ||
| if doc_filter: | ||
| filter.update(doc_filter) | ||
| elif doc_filter: | ||
| filter = doc_filter | ||
| return self._collection._find(filter) | ||
|
|
||
|
|
||
|
|
@@ -482,7 +467,7 @@ def build_job_statepoint_index(self, exclude_const=False, index=None): | |
| """ | ||
| from .schema import _build_job_statepoint_index | ||
| if index is None: | ||
| index = [{'_id': job._id, 'statepoint': job.sp()} for job in self] | ||
| index = [{'_id': job._id, 'sp': job.sp()} for job in self] | ||
| for x in _build_job_statepoint_index(jobs=self, exclude_const=exclude_const, index=index): | ||
| yield x | ||
|
|
||
|
|
@@ -536,14 +521,15 @@ def find_job_ids(self, filter=None, doc_filter=None, index=None): | |
| if filter is None and doc_filter is None and index is None: | ||
| return list(self._job_dirs()) | ||
| if index is None: | ||
| if doc_filter is None: | ||
| index = self._sp_index() | ||
| else: | ||
| filter = dict(parse_filter(filter, 'sp')) | ||
| if doc_filter: | ||
| filter.update(parse_filter(doc_filter, 'doc')) | ||
| index = self.index(include_job_document=True) | ||
| search_index = JobSearchIndex(index, _trust=True) | ||
| else: | ||
| search_index = JobSearchIndex(index) | ||
| return search_index.find_job_ids(filter=filter, doc_filter=doc_filter) | ||
| elif 'doc' in _root_keys(filter): | ||
| index = self.index(include_job_document=True) | ||
| else: | ||
| index = self._sp_index() | ||
| return Collection(index, _trust=True)._find(filter) | ||
|
|
||
| def find_jobs(self, filter=None, doc_filter=None): | ||
| """Find all jobs in the project's workspace. | ||
|
|
@@ -565,7 +551,10 @@ def find_jobs(self, filter=None, doc_filter=None): | |
| :raises RuntimeError: If the filters are not supported | ||
| by the index. | ||
| """ | ||
| return JobsCursor(self, filter, doc_filter) | ||
| filter = dict(parse_filter(filter, 'sp')) | ||
| if doc_filter: | ||
| filter.update(parse_filter(doc_filter, 'doc')) | ||
| return JobsCursor(self, filter) | ||
|
|
||
| def __iter__(self): | ||
| return iter(self.find_jobs()) | ||
|
|
@@ -1133,7 +1122,7 @@ def repair(self, fn_statepoints=None, index=None, job_ids=None): | |
| raise | ||
| if index is not None: | ||
| for doc in index: | ||
| self._sp_cache[doc['signac_id']] = doc['statepoint'] | ||
| self._sp_cache[doc['signac_id']] = doc['sp'] | ||
|
|
||
| corrupted = [] | ||
| for job_id in job_ids: | ||
|
|
@@ -1188,21 +1177,21 @@ def _sp_index(self): | |
| for _id in to_remove: | ||
| del self._index_cache[_id] | ||
| for _id in to_add: | ||
| self._index_cache[_id] = dict(statepoint=self.get_statepoint(_id), _id=_id) | ||
| self._index_cache[_id] = dict(sp=self.get_statepoint(_id), _id=_id) | ||
| return self._index_cache.values() | ||
|
|
||
| def _build_index(self, include_job_document=False): | ||
| "Return a basic state point index." | ||
| wd = self.workspace() if self.Job is Job else None | ||
| for _id in self.find_job_ids(): | ||
| doc = dict(_id=_id, statepoint=self.get_statepoint(_id)) | ||
| doc = dict(_id=_id, sp=self.get_statepoint(_id)) | ||
| if include_job_document: | ||
| if wd is None: | ||
| doc.update(self.open_job(id=_id).document) | ||
| doc['doc'] = self.open_job(id=_id).document | ||
| else: # use optimized path | ||
| try: | ||
| with open(os.path.join(wd, _id, self.Job.FN_DOCUMENT), 'rb') as file: | ||
| doc.update(json.loads(file.read().decode())) | ||
| doc['doc'] = json.loads(file.read().decode()) | ||
| except IOError as error: | ||
| if error.errno != errno.ENOENT: | ||
| raise | ||
|
|
@@ -1306,7 +1295,7 @@ def _read_cache(self): | |
| return cache | ||
|
|
||
| def index(self, formats=None, depth=0, | ||
| skip_errors=False, include_job_document=True): | ||
| skip_errors=False, include_job_document=True, **kwargs): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we actually need to support arbitrary kwargs here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, that is an error. I was trying to introduce backwards compatibility here, and I think we should probably still do that in the long run, but as of right now, the |
||
| r"""Generate an index of the project's workspace. | ||
|
|
||
| This generator function indexes every file in the project's | ||
|
|
@@ -1598,21 +1587,23 @@ class JobsCursor(object): | |
| """ | ||
| _use_pandas_for_html_repr = True # toggle use of pandas for html repr | ||
|
|
||
| def __init__(self, project, filter, doc_filter): | ||
| def __init__(self, project, filter): | ||
| self._project = project | ||
| self._filter = filter | ||
| self._doc_filter = doc_filter | ||
|
|
||
| # This private attribute allows us to implement the deprecated | ||
| # next() method for this class. | ||
| self._next_iter = None | ||
|
|
||
| def __eq__(self, other): | ||
| return self._project == other._project and self._filter == other._filter | ||
|
|
||
| def __len__(self): | ||
| # Highly performance critical code path!! | ||
| if self._filter or self._doc_filter: | ||
| if self._filter: | ||
| # We use the standard function for determining job ids if and only if | ||
| # any of the two filter is provided. | ||
| return len(self._project.find_job_ids(self._filter, self._doc_filter)) | ||
| return len(self._project.find_job_ids(self._filter)) | ||
| else: | ||
| # Without filter we can simply return the length of the whole project. | ||
| return self._project.__len__() | ||
|
|
@@ -1621,7 +1612,7 @@ def __iter__(self): | |
| # Code duplication here for improved performance. | ||
| return _JobsCursorIterator( | ||
| self._project, | ||
| self._project.find_job_ids(self._filter, self._doc_filter), | ||
| self._project.find_job_ids(self._filter) | ||
| ) | ||
|
|
||
| def next(self): | ||
|
|
@@ -1799,12 +1790,10 @@ def _export_sp_and_doc(job): | |
| orient='index').infer_objects() | ||
|
|
||
| def __repr__(self): | ||
| return "{type}({{'project': '{project}', 'filter': '{filter}',"\ | ||
| " 'docfilter': '{doc_filter}'}})".format( | ||
| return "{type}({{'project': '{project}', 'filter': '{filter}'}})".format( | ||
| type=self.__class__.__module__ + '.' + self.__class__.__name__, | ||
| project=self._project, | ||
| filter=self._filter, | ||
| doc_filter=self._doc_filter) | ||
| filter=self._filter) | ||
|
|
||
| def _repr_html_jobs(self): | ||
| html = '' | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep this "private" by renaming to
_parse_simpleand renaming the current_parse_simpleto_parse_simple_singleor so. I recognize that you probably did this to support importing into other modules where it's used, but I don't think we need to bloat the API.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with the name change, however, I was actually hoping to expose this function publicly, to make it easier to implement custom user scripts like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I support making this a public method / official API - if it's not public then
signac-dashboardhas to rely on a private method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, in that case I support making it part of the API as well. I still like renaming the
_parse_simplemethod to indicate that it operates on one filter component at a time, but we don't have to change theparse_simplemethod.