88import uuid
99import hashlib
1010import json
11- from typing import Any , Callable , Dict , List , Text , Tuple , Union , cast
11+ import copy
12+ from typing import Any , Callable , Dict , List , Text , Tuple , Union , cast , Iterable
1213
1314import requests .sessions
1415from six import itervalues , string_types
2324
2425from . import process , update
2526from .errors import WorkflowException
26- from .process import Process , shortname
27+ from .process import Process , shortname , get_schema
2728from .update import ALLUPDATES
2829
2930_logger = logging .getLogger ("cwltool" )
3031
3132jobloaderctx = {
3233 u"cwl" : "https://w3id.org/cwl/cwl#" ,
34+ u"cwltool" : "http://commonwl.org/cwltool#" ,
3335 u"path" : {u"@type" : u"@id" },
3436 u"location" : {u"@type" : u"@id" },
3537 u"format" : {u"@type" : u"@id" },
3638 u"id" : u"@id"
3739}
3840
41+
42+ overrides_ctx = {
43+ u"overrideTarget" : {u"@type" : u"@id" },
44+ u"cwltool" : "http://commonwl.org/cwltool#" ,
45+ u"overrides" : {
46+ "@id" : "cwltool:overrides" ,
47+ "mapSubject" : "overrideTarget" ,
48+ "mapPredicate" : "override"
49+ },
50+ u"override" : {
51+ "@id" : "cwltool:override" ,
52+ "mapSubject" : "class"
53+ }
54+ } # type: Dict[Text, Union[Dict[Any, Any], Text, Iterable[Text]]]
55+
56+ def resolve_tool_uri (argsworkflow , # type: Text
57+ resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
58+ fetcher_constructor = None ,
59+ # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
60+ document_loader = None # type: Loader
61+ ):
62+ # type: (...) -> Tuple[Text, Text]
63+
64+ uri = None # type: Text
65+ split = urllib .parse .urlsplit (argsworkflow )
66+ # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
67+ if split .scheme and split .scheme in [u'http' ,u'https' ,u'file' ]:
68+ uri = argsworkflow
69+ elif os .path .exists (os .path .abspath (argsworkflow )):
70+ uri = file_uri (str (os .path .abspath (argsworkflow )))
71+ elif resolver :
72+ if document_loader is None :
73+ document_loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor ) # type: ignore
74+ uri = resolver (document_loader , argsworkflow )
75+
76+ if uri is None :
77+ raise ValidationException ("Not found: '%s'" % argsworkflow )
78+
79+ if argsworkflow != uri :
80+ _logger .info ("Resolved '%s' to '%s'" , argsworkflow , uri )
81+
82+ fileuri = urllib .parse .urldefrag (uri )[0 ]
83+ return uri , fileuri
84+
85+
3986def fetch_document (argsworkflow , # type: Union[Text, Dict[Text, Any]]
4087 resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
4188 fetcher_constructor = None
@@ -49,22 +96,7 @@ def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
4996 uri = None # type: Text
5097 workflowobj = None # type: CommentedMap
5198 if isinstance (argsworkflow , string_types ):
52- split = urllib .parse .urlsplit (argsworkflow )
53- # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that
54- if split .scheme and split .scheme in [u'http' ,u'https' ,u'file' ]:
55- uri = argsworkflow
56- elif os .path .exists (os .path .abspath (argsworkflow )):
57- uri = file_uri (str (os .path .abspath (argsworkflow )))
58- elif resolver :
59- uri = resolver (document_loader , argsworkflow )
60-
61- if uri is None :
62- raise ValidationException ("Not found: '%s'" % argsworkflow )
63-
64- if argsworkflow != uri :
65- _logger .info ("Resolved '%s' to '%s'" , argsworkflow , uri )
66-
67- fileuri = urllib .parse .urldefrag (uri )[0 ]
99+ uri , fileuri = resolve_tool_uri (argsworkflow , resolver = resolver , document_loader = document_loader )
68100 workflowobj = document_loader .fetch (fileuri )
69101 elif isinstance (argsworkflow , dict ):
70102 uri = "#" + Text (id (argsworkflow ))
@@ -139,8 +171,9 @@ def validate_document(document_loader, # type: Loader
139171 strict = True , # type: bool
140172 preprocess_only = False , # type: bool
141173 fetcher_constructor = None ,
142- skip_schemas = None
174+ skip_schemas = None ,
143175 # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
176+ overrides = None # type: List[Dict]
144177 ):
145178 # type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
146179 """Validate a CWL document."""
@@ -155,9 +188,15 @@ def validate_document(document_loader, # type: Loader
155188
156189 jobobj = None
157190 if "cwl:tool" in workflowobj :
158- jobobj , _ = document_loader .resolve_all (workflowobj , uri )
191+ job_loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor ) # type: ignore
192+ jobobj , _ = job_loader .resolve_all (workflowobj , uri )
159193 uri = urllib .parse .urljoin (uri , workflowobj ["https://w3id.org/cwl/cwl#tool" ])
160194 del cast (dict , jobobj )["https://w3id.org/cwl/cwl#tool" ]
195+
196+ if "http://commonwl.org/cwltool#overrides" in jobobj :
197+ overrides .extend (resolve_overrides (jobobj , uri , uri ))
198+ del jobobj ["http://commonwl.org/cwltool#overrides" ]
199+
161200 workflowobj = fetch_document (uri , fetcher_constructor = fetcher_constructor )[1 ]
162201
163202 fileuri = urllib .parse .urldefrag (uri )[0 ]
@@ -225,6 +264,9 @@ def validate_document(document_loader, # type: Loader
225264 if jobobj :
226265 metadata [u"cwl:defaults" ] = jobobj
227266
267+ if overrides :
268+ metadata [u"cwltool:overrides" ] = overrides
269+
228270 return document_loader , avsc_names , processobj , metadata , uri
229271
230272
@@ -277,14 +319,29 @@ def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
277319 enable_dev = False , # type: bool
278320 strict = True , # type: bool
279321 resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
280- fetcher_constructor = None # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
322+ fetcher_constructor = None , # type: Callable[[Dict[Text, Text], requests.sessions.Session], Fetcher]
323+ overrides = None
281324 ):
282325 # type: (...) -> Process
283326
284327 document_loader , workflowobj , uri = fetch_document (argsworkflow , resolver = resolver ,
285328 fetcher_constructor = fetcher_constructor )
286329 document_loader , avsc_names , processobj , metadata , uri = validate_document (
287330 document_loader , workflowobj , uri , enable_dev = enable_dev ,
288- strict = strict , fetcher_constructor = fetcher_constructor )
331+ strict = strict , fetcher_constructor = fetcher_constructor ,
332+ overrides = overrides )
289333 return make_tool (document_loader , avsc_names , metadata , uri ,
290334 makeTool , kwargs if kwargs else {})
335+
336+ def resolve_overrides (ov , ov_uri , baseurl ): # type: (CommentedMap, Text, Text) -> List[Dict[Text, Any]]
337+ ovloader = Loader (overrides_ctx )
338+ ret , _ = ovloader .resolve_all (ov , baseurl )
339+ if not isinstance (ret , CommentedMap ):
340+ raise Exception ("Expected CommentedMap, got %s" % type (ret ))
341+ cwl_docloader = get_schema ("v1.0" )[0 ]
342+ cwl_docloader .resolve_all (ret , ov_uri )
343+ return ret ["overrides" ]
344+
345+ def load_overrides (ov , base_url ): # type: (Text, Text) -> List[Dict[Text, Any]]
346+ ovloader = Loader (overrides_ctx )
347+ return resolve_overrides (ovloader .fetch (ov ), ov , base_url )
0 commit comments