From f8967a7d3917ef4de9f163e523f7cef2e7c4f97e Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 27 Jan 2021 21:19:46 +0200 Subject: [PATCH 1/8] Fix cx panic when object and prefix present on bkt --- pkg/lib/gcp/gcs.go | 2 +- pkg/lib/slices/string.go | 10 ++++++++++ pkg/types/spec/errors.go | 16 ++++++++++++---- pkg/types/spec/utils.go | 20 ++++++++++++++++++++ 4 files changed, 43 insertions(+), 5 deletions(-) diff --git a/pkg/lib/gcp/gcs.go b/pkg/lib/gcp/gcs.go index a5cc8b66fa..0ee3991fd3 100644 --- a/pkg/lib/gcp/gcs.go +++ b/pkg/lib/gcp/gcs.go @@ -38,7 +38,7 @@ func SplitGCSPath(gcsPath string) (string, string, error) { if !IsValidGCSPath(gcsPath) { return "", "", ErrorInvalidGCSPath(gcsPath) } - fullPath := gcsPath[len("s3://"):] + fullPath := gcsPath[len("gs://"):] slashIndex := strings.Index(fullPath, "/") if slashIndex == -1 { return fullPath, "", nil diff --git a/pkg/lib/slices/string.go b/pkg/lib/slices/string.go index 0fef3a1b4d..5e509bd797 100644 --- a/pkg/lib/slices/string.go +++ b/pkg/lib/slices/string.go @@ -98,6 +98,16 @@ func RemoveEmptiesAndUnique(strs []string) []string { return out } +func RemoveIfHasString(strs []string, str string) []string { + var filteredStrs []string + for _, elem := range strs { + if elem != str { + filteredStrs = append(filteredStrs, elem) + } + } + return filteredStrs +} + func HasDuplicateStr(in []string) bool { keys := strset.New() for _, elem := range in { diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index dbb101cfc5..ff8241c221 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -55,10 +55,11 @@ const ( ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem" - ErrFileNotFound = "spec.file_not_found" - ErrDirIsEmpty = "spec.dir_is_empty" - ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path" - ErrPythonPathNotFound = "spec.python_path_not_found" + ErrFileNotFound = "spec.file_not_found" + ErrDirIsEmpty = "spec.dir_is_empty" + ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path" + ErrPythonPathNotFound = "spec.python_path_not_found" + ErrObjectConflictingWithPrefix = "spec.object_conflicting_with_prefix" ErrS3FileNotFound = "spec.s3_file_not_found" ErrS3DirNotFound = "spec.s3_dir_not_found" @@ -274,6 +275,13 @@ func ErrorPythonPathNotFound(pythonPath string) error { }) } +func ErrorObjectConflictingWithPrefix(path string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrObjectConflictingWithPrefix, + Message: fmt.Sprintf("%s: path prefix conflicts with object %s; remove object %s to be able to use %s as prefix for other objects", path, path, path, path), + }) +} + func ErrorS3FileNotFound(path string) error { return errors.WithStack(&errors.Error{ Kind: ErrS3FileNotFound, diff --git a/pkg/types/spec/utils.go b/pkg/types/spec/utils.go index 45503ae10a..fd7c901ffb 100644 --- a/pkg/types/spec/utils.go +++ b/pkg/types/spec/utils.go @@ -140,6 +140,7 @@ func validateDirModels( if err != nil { return nil, err } + dirPrefix = s.EnsureSuffix(dirPrefix, "/") s3Objects, err := awsClientForBucket.ListS3PathDir(modelPath, false, nil) if err != nil { @@ -152,6 +153,7 @@ func validateDirModels( if err != nil { return nil, err } + dirPrefix = s.EnsureSuffix(dirPrefix, "/") gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil) if err != nil { @@ -162,6 +164,12 @@ func validateDirModels( if len(modelDirPaths) == 0 { return nil, errorForPredictorType(dirPrefix, modelDirPaths) } + if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, dirPrefix)) { + return nil, ErrorObjectConflictingWithPrefix(dirPrefix) + } + if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, strings.TrimRight(dirPrefix, "/"))) { + return nil, ErrorObjectConflictingWithPrefix(strings.TrimRight(dirPrefix, "/")) + } modelNames := []string{} modelDirPathLength := len(slices.RemoveEmpties(strings.Split(dirPrefix, "/"))) @@ -179,7 +187,13 @@ func validateDirModels( } modelPrefix := filepath.Join(dirPrefix, modelName) + if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, modelPrefix)) { + return nil, ErrorObjectConflictingWithPrefix(modelPrefix) + } modelPrefix = s.EnsureSuffix(modelPrefix, "/") + if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, modelPrefix)) { + return nil, ErrorObjectConflictingWithPrefix(modelPrefix) + } modelStructureType := determineBaseModelStructure(modelDirPaths, modelPrefix) if modelStructureType == userconfig.UnknownModelStructureType { @@ -296,6 +310,12 @@ func validateModels( if len(modelPaths) == 0 { return nil, errors.Wrap(errorForPredictorType(modelPrefix, modelPaths), modelNameWrapStr) } + if len(modelPaths) != len(slices.RemoveIfHasString(modelPaths, modelPrefix)) { + return nil, ErrorObjectConflictingWithPrefix(modelPrefix) + } + if len(modelPaths) != len(slices.RemoveIfHasString(modelPaths, strings.TrimRight(modelPrefix, "/"))) { + return nil, ErrorObjectConflictingWithPrefix(strings.TrimRight(modelPrefix, "/")) + } modelStructureType := determineBaseModelStructure(modelPaths, modelPrefix) if modelStructureType == userconfig.UnknownModelStructureType { From f26030ebd62bac262f06d8f732c8d31126816dd1 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 27 Jan 2021 21:26:42 +0200 Subject: [PATCH 2/8] Misc change --- pkg/types/spec/utils.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/types/spec/utils.go b/pkg/types/spec/utils.go index fd7c901ffb..e4dcdc9f49 100644 --- a/pkg/types/spec/utils.go +++ b/pkg/types/spec/utils.go @@ -140,7 +140,6 @@ func validateDirModels( if err != nil { return nil, err } - dirPrefix = s.EnsureSuffix(dirPrefix, "/") s3Objects, err := awsClientForBucket.ListS3PathDir(modelPath, false, nil) if err != nil { @@ -153,7 +152,6 @@ func validateDirModels( if err != nil { return nil, err } - dirPrefix = s.EnsureSuffix(dirPrefix, "/") gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil) if err != nil { @@ -164,6 +162,7 @@ func validateDirModels( if len(modelDirPaths) == 0 { return nil, errorForPredictorType(dirPrefix, modelDirPaths) } + dirPrefix = s.EnsureSuffix(dirPrefix, "/") if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, dirPrefix)) { return nil, ErrorObjectConflictingWithPrefix(dirPrefix) } From 0b6e072e0643e6a149c34c326436355fa01ca4d4 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 3 Feb 2021 17:13:21 +0200 Subject: [PATCH 3/8] Revert "Fix cx panic when object and prefix present on bkt" This reverts commit f8967a7d3917ef4de9f163e523f7cef2e7c4f97e. # Conflicts: # pkg/types/spec/utils.go --- pkg/lib/gcp/gcs.go | 2 +- pkg/lib/slices/string.go | 10 ---------- pkg/types/spec/errors.go | 16 ++++------------ pkg/types/spec/utils.go | 19 ------------------- 4 files changed, 5 insertions(+), 42 deletions(-) diff --git a/pkg/lib/gcp/gcs.go b/pkg/lib/gcp/gcs.go index e631a141e9..45bb3283b6 100644 --- a/pkg/lib/gcp/gcs.go +++ b/pkg/lib/gcp/gcs.go @@ -38,7 +38,7 @@ func SplitGCSPath(gcsPath string) (string, string, error) { if !IsValidGCSPath(gcsPath) { return "", "", ErrorInvalidGCSPath(gcsPath) } - fullPath := gcsPath[len("gs://"):] + fullPath := gcsPath[len("s3://"):] slashIndex := strings.Index(fullPath, "/") if slashIndex == -1 { return fullPath, "", nil diff --git a/pkg/lib/slices/string.go b/pkg/lib/slices/string.go index 5e509bd797..0fef3a1b4d 100644 --- a/pkg/lib/slices/string.go +++ b/pkg/lib/slices/string.go @@ -98,16 +98,6 @@ func RemoveEmptiesAndUnique(strs []string) []string { return out } -func RemoveIfHasString(strs []string, str string) []string { - var filteredStrs []string - for _, elem := range strs { - if elem != str { - filteredStrs = append(filteredStrs, elem) - } - } - return filteredStrs -} - func HasDuplicateStr(in []string) bool { keys := strset.New() for _, elem := range in { diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index ff8241c221..dbb101cfc5 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -55,11 +55,10 @@ const ( ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem" - ErrFileNotFound = "spec.file_not_found" - ErrDirIsEmpty = "spec.dir_is_empty" - ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path" - ErrPythonPathNotFound = "spec.python_path_not_found" - ErrObjectConflictingWithPrefix = "spec.object_conflicting_with_prefix" + ErrFileNotFound = "spec.file_not_found" + ErrDirIsEmpty = "spec.dir_is_empty" + ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path" + ErrPythonPathNotFound = "spec.python_path_not_found" ErrS3FileNotFound = "spec.s3_file_not_found" ErrS3DirNotFound = "spec.s3_dir_not_found" @@ -275,13 +274,6 @@ func ErrorPythonPathNotFound(pythonPath string) error { }) } -func ErrorObjectConflictingWithPrefix(path string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrObjectConflictingWithPrefix, - Message: fmt.Sprintf("%s: path prefix conflicts with object %s; remove object %s to be able to use %s as prefix for other objects", path, path, path, path), - }) -} - func ErrorS3FileNotFound(path string) error { return errors.WithStack(&errors.Error{ Kind: ErrS3FileNotFound, diff --git a/pkg/types/spec/utils.go b/pkg/types/spec/utils.go index e4dcdc9f49..45503ae10a 100644 --- a/pkg/types/spec/utils.go +++ b/pkg/types/spec/utils.go @@ -162,13 +162,6 @@ func validateDirModels( if len(modelDirPaths) == 0 { return nil, errorForPredictorType(dirPrefix, modelDirPaths) } - dirPrefix = s.EnsureSuffix(dirPrefix, "/") - if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, dirPrefix)) { - return nil, ErrorObjectConflictingWithPrefix(dirPrefix) - } - if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, strings.TrimRight(dirPrefix, "/"))) { - return nil, ErrorObjectConflictingWithPrefix(strings.TrimRight(dirPrefix, "/")) - } modelNames := []string{} modelDirPathLength := len(slices.RemoveEmpties(strings.Split(dirPrefix, "/"))) @@ -186,13 +179,7 @@ func validateDirModels( } modelPrefix := filepath.Join(dirPrefix, modelName) - if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, modelPrefix)) { - return nil, ErrorObjectConflictingWithPrefix(modelPrefix) - } modelPrefix = s.EnsureSuffix(modelPrefix, "/") - if len(modelDirPaths) != len(slices.RemoveIfHasString(modelDirPaths, modelPrefix)) { - return nil, ErrorObjectConflictingWithPrefix(modelPrefix) - } modelStructureType := determineBaseModelStructure(modelDirPaths, modelPrefix) if modelStructureType == userconfig.UnknownModelStructureType { @@ -309,12 +296,6 @@ func validateModels( if len(modelPaths) == 0 { return nil, errors.Wrap(errorForPredictorType(modelPrefix, modelPaths), modelNameWrapStr) } - if len(modelPaths) != len(slices.RemoveIfHasString(modelPaths, modelPrefix)) { - return nil, ErrorObjectConflictingWithPrefix(modelPrefix) - } - if len(modelPaths) != len(slices.RemoveIfHasString(modelPaths, strings.TrimRight(modelPrefix, "/"))) { - return nil, ErrorObjectConflictingWithPrefix(strings.TrimRight(modelPrefix, "/")) - } modelStructureType := determineBaseModelStructure(modelPaths, modelPrefix) if modelStructureType == userconfig.UnknownModelStructureType { From 1c632e40bba177433b4f4710f8e4c196f9fe582c Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 3 Feb 2021 17:19:20 +0200 Subject: [PATCH 4/8] Ignore objects ending with "/" for GCS --- pkg/lib/gcp/gcs.go | 16 ++++++++++------ pkg/operator/config/wrappers.go | 2 +- pkg/types/spec/utils.go | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/lib/gcp/gcs.go b/pkg/lib/gcp/gcs.go index 45bb3283b6..014219c168 100644 --- a/pkg/lib/gcp/gcs.go +++ b/pkg/lib/gcp/gcs.go @@ -255,12 +255,12 @@ func ConvertGCSObjectsToKeys(gcsObjects ...*storage.ObjectAttrs) []string { return paths } -func (c *Client) ListGCSDir(bucket string, gcsDir string, maxResults *int64) ([]*storage.ObjectAttrs, error) { +func (c *Client) ListGCSDir(bucket string, gcsDir string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) { gcsDir = s.EnsureSuffix(gcsDir, "/") - return c.ListGCSPrefix(bucket, gcsDir, maxResults) + return c.ListGCSPrefix(bucket, gcsDir, includeDirObjects, maxResults) } -func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64) ([]*storage.ObjectAttrs, error) { +func (c *Client) ListGCSPrefix(bucket string, prefix string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) { gcsClient, err := c.GCS() if err != nil { return nil, err @@ -282,7 +282,11 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64) if attrs == nil { continue } - gcsObjects = append(gcsObjects, attrs) + + if !includeDirObjects && !strings.HasSuffix(attrs.Name, "/") { + gcsObjects = append(gcsObjects, attrs) + } + if maxResults != nil && int64(len(gcsObjects)) >= *maxResults { break } @@ -291,12 +295,12 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64) return gcsObjects, nil } -func (c *Client) ListGCSPathDir(gcsDirPath string, maxResults *int64) ([]*storage.ObjectAttrs, error) { +func (c *Client) ListGCSPathDir(gcsDirPath string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) { bucket, gcsDir, err := SplitGCSPath(gcsDirPath) if err != nil { return nil, err } - return c.ListGCSDir(bucket, gcsDir, maxResults) + return c.ListGCSDir(bucket, gcsDir, includeDirObjects, maxResults) } // This behaves like you'd expect `ls` to behave on a local file system diff --git a/pkg/operator/config/wrappers.go b/pkg/operator/config/wrappers.go index e2eb6c725c..cae09eb873 100644 --- a/pkg/operator/config/wrappers.go +++ b/pkg/operator/config/wrappers.go @@ -159,7 +159,7 @@ func ListBucketPrefix(prefix string, maxResults *int64) ([]*storage.ObjectAttrs, } return nil, s3Objects, nil case types.GCPProviderType: - gcsObjects, err := GCP.ListGCSPrefix(GCPCoreConfig.Bucket, prefix, maxResults) + gcsObjects, err := GCP.ListGCSPrefix(GCPCoreConfig.Bucket, prefix, false, maxResults) if err != nil { return nil, nil, err } diff --git a/pkg/types/spec/utils.go b/pkg/types/spec/utils.go index 45503ae10a..d5b9ffc7cc 100644 --- a/pkg/types/spec/utils.go +++ b/pkg/types/spec/utils.go @@ -153,7 +153,7 @@ func validateDirModels( return nil, err } - gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil) + gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, false, nil) if err != nil { return nil, err } @@ -287,7 +287,7 @@ func validateModels( } modelPrefix = s.EnsureSuffix(modelPrefix, "/") - gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil) + gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, false, nil) if err != nil { return nil, errors.Wrap(err, modelNameWrapStr) } From 26e6018af6ef3e2730023181b7973a787eee9838 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 3 Feb 2021 22:14:11 +0200 Subject: [PATCH 5/8] Ignore/include empty dirs for s3/gcs --- .../serve/cortex_internal/lib/storage/gcs.py | 30 ++++++++++++++----- .../serve/cortex_internal/lib/storage/s3.py | 26 ++++++++-------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py index d420e49568..a6b3ab65c6 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py @@ -57,15 +57,33 @@ def is_valid_gcs_path(path: str) -> bool: def _does_blob_exist(self, prefix: str) -> bool: return isinstance(self.gcs.get_blob(blob_name=prefix), storage.blob.Blob) + def _gcs_matching_blobs_generator(self, max_results=None, prefix="", include_dir_objects=False): + for blob in self.gcs.list_blobs(max_results, prefix=prefix): + if include_dir_objects or (not include_dir_objects and not blob.name.endswith("/")): + yield blob + def _is_gcs_dir(self, dir_path: str) -> bool: prefix = util.ensure_suffix(dir_path, "/") - return len(list(self.gcs.list_blobs(max_results=2, prefix=prefix))) > 1 - - def search(self, prefix: str = "") -> Tuple[List[str], List[datetime.datetime]]: + return ( + len( + list( + self._gcs_matching_blobs_generator( + max_results=2, prefix=prefix, include_dir_objects=True + ) + ) + ) + > 1 + ) + + def search( + self, prefix: str = "", include_dir_objects=False + ) -> Tuple[List[str], List[datetime.datetime]]: paths = [] timestamps = [] - for blob in self.gcs.list_blobs(prefix=prefix): + for blob in self._gcs_matching_blobs_generator( + prefix=prefix, include_dir_objects=include_dir_objects + ): paths.append(blob.name) timestamps.append(blob.updated) @@ -112,9 +130,7 @@ def download_dir(self, prefix: str, local_dir: str): def download_dir_contents(self, prefix: str, local_dir: str): util.mkdir_p(local_dir) prefix = util.ensure_suffix(prefix, "/") - for blob in self.gcs.list_blobs(prefix=prefix): - if blob.name.endswith("/"): - continue + for blob in self._gcs_matching_blobs_generator(prefix=prefix): relative_path = util.trim_prefix(blob.name, prefix) local_dest_path = os.path.join(local_dir, relative_path) self.download_file(blob.name, local_dest_path) diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/s3.py b/pkg/cortex/serve/cortex_internal/lib/storage/s3.py index ea7a28bdd4..ef530cabc5 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/s3.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/s3.py @@ -85,7 +85,7 @@ def _is_s3_dir(self, dir_path): prefix = util.ensure_suffix(dir_path, "/") return self._is_s3_prefix(prefix) - def _get_matching_s3_objects_generator(self, prefix="", suffix=""): + def _get_matching_s3_objects_generator(self, prefix="", suffix="", include_dir_objects=False): kwargs = {"Bucket": self.bucket, "Prefix": prefix} while True: @@ -97,7 +97,11 @@ def _get_matching_s3_objects_generator(self, prefix="", suffix=""): for obj in contents: key = obj["Key"] - if key.startswith(prefix) and key.endswith(suffix): + if ( + key.startswith(prefix) + and key.endswith(suffix) + and (include_dir_objects or (not include_dir_objects and not key.endswith("/"))) + ): yield obj try: @@ -105,8 +109,8 @@ def _get_matching_s3_objects_generator(self, prefix="", suffix=""): except KeyError: break - def _get_matching_s3_keys_generator(self, prefix="", suffix=""): - for obj in self._get_matching_s3_objects_generator(prefix, suffix): + def _get_matching_s3_keys_generator(self, prefix="", suffix="", include_dir_objects=False): + for obj in self._get_matching_s3_objects_generator(prefix, suffix, include_dir_objects): yield obj["Key"], obj["LastModified"] def put_object(self, body, key): @@ -146,18 +150,16 @@ def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None): return byte_array.strip() - def search(self, prefix="", suffix="") -> Tuple[List[str], List[datetime.datetime]]: + def search( + self, prefix="", suffix="", include_dir_objects=False + ) -> Tuple[List[str], List[datetime.datetime]]: paths = [] timestamps = [] timestamp_map = {} - for key, ts in self._get_matching_s3_keys_generator(prefix, suffix): - timestamp_map[key] = ts - - filtered_keys = util.remove_non_empty_directory_paths(list(timestamp_map.keys())) - for key in filtered_keys: + for key, ts in self._get_matching_s3_keys_generator(prefix, suffix, include_dir_objects): paths.append(key) - timestamps.append(timestamp_map[key]) + timestamps.append(ts) return paths, timestamps @@ -218,8 +220,6 @@ def download_dir_contents(self, prefix, local_dir): util.mkdir_p(local_dir) prefix = util.ensure_suffix(prefix, "/") for key, _ in self._get_matching_s3_keys_generator(prefix): - if key.endswith("/"): - continue rel_path = util.trim_prefix(key, prefix) local_dest_path = os.path.join(local_dir, rel_path) self.download_file(key, local_dest_path) From 09c8a5516f0848f74788b4b65d5d6986ef6e106d Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 5 Feb 2021 17:47:40 +0200 Subject: [PATCH 6/8] Address PR comments --- .../serve/cortex_internal/lib/storage/gcs.py | 15 ++++++--------- .../serve/cortex_internal/lib/storage/s3.py | 3 +-- pkg/lib/gcp/gcs.go | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py index a6b3ab65c6..89eafe3bad 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py @@ -59,22 +59,19 @@ def _does_blob_exist(self, prefix: str) -> bool: def _gcs_matching_blobs_generator(self, max_results=None, prefix="", include_dir_objects=False): for blob in self.gcs.list_blobs(max_results, prefix=prefix): - if include_dir_objects or (not include_dir_objects and not blob.name.endswith("/")): + if include_dir_objects or not blob.name.endswith("/"): yield blob def _is_gcs_dir(self, dir_path: str) -> bool: prefix = util.ensure_suffix(dir_path, "/") - return ( - len( - list( - self._gcs_matching_blobs_generator( - max_results=2, prefix=prefix, include_dir_objects=True - ) - ) + matching_blobs = list( + self._gcs_matching_blobs_generator( + max_results=2, prefix=prefix, include_dir_objects=True ) - > 1 ) + return len(matching_blobs) > 1 + def search( self, prefix: str = "", include_dir_objects=False ) -> Tuple[List[str], List[datetime.datetime]]: diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/s3.py b/pkg/cortex/serve/cortex_internal/lib/storage/s3.py index ef530cabc5..ac89e43f8d 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/s3.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/s3.py @@ -100,7 +100,7 @@ def _get_matching_s3_objects_generator(self, prefix="", suffix="", include_dir_o if ( key.startswith(prefix) and key.endswith(suffix) - and (include_dir_objects or (not include_dir_objects and not key.endswith("/"))) + and (include_dir_objects or not key.endswith("/")) ): yield obj @@ -156,7 +156,6 @@ def search( paths = [] timestamps = [] - timestamp_map = {} for key, ts in self._get_matching_s3_keys_generator(prefix, suffix, include_dir_objects): paths.append(key) timestamps.append(ts) diff --git a/pkg/lib/gcp/gcs.go b/pkg/lib/gcp/gcs.go index 014219c168..80316c4530 100644 --- a/pkg/lib/gcp/gcs.go +++ b/pkg/lib/gcp/gcs.go @@ -283,7 +283,7 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, includeDirObjects b continue } - if !includeDirObjects && !strings.HasSuffix(attrs.Name, "/") { + if includeDirObjects || !strings.HasSuffix(attrs.Name, "/") { gcsObjects = append(gcsObjects, attrs) } From 802fcadc12fcdce54041d66d7f5195c02a227a86 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Fri, 5 Feb 2021 22:29:43 +0200 Subject: [PATCH 7/8] Fix _is_gcs_dir private method --- pkg/cortex/serve/cortex_internal/lib/storage/gcs.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py index 89eafe3bad..3ec9390e09 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py @@ -64,13 +64,9 @@ def _gcs_matching_blobs_generator(self, max_results=None, prefix="", include_dir def _is_gcs_dir(self, dir_path: str) -> bool: prefix = util.ensure_suffix(dir_path, "/") - matching_blobs = list( - self._gcs_matching_blobs_generator( - max_results=2, prefix=prefix, include_dir_objects=True - ) - ) + matching_blobs = list(self._gcs_matching_blobs_generator(max_results=2, prefix=prefix)) - return len(matching_blobs) > 1 + return len(matching_blobs) > 0 def search( self, prefix: str = "", include_dir_objects=False From 68220ba4c43ed58fa1a350d2ef9c319a878ceb9e Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Mon, 15 Feb 2021 23:52:17 +0200 Subject: [PATCH 8/8] Change how directories are interpreted for AWS/GCP --- .../serve/cortex_internal/lib/storage/gcs.py | 14 +++++++++++--- pkg/cortex/serve/cortex_internal/lib/storage/s3.py | 8 ++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py index 3ec9390e09..8c96c35d73 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/gcs.py @@ -64,7 +64,11 @@ def _gcs_matching_blobs_generator(self, max_results=None, prefix="", include_dir def _is_gcs_dir(self, dir_path: str) -> bool: prefix = util.ensure_suffix(dir_path, "/") - matching_blobs = list(self._gcs_matching_blobs_generator(max_results=2, prefix=prefix)) + matching_blobs = list( + self._gcs_matching_blobs_generator( + max_results=2, prefix=prefix, include_dir_objects=True + ) + ) return len(matching_blobs) > 0 @@ -123,10 +127,14 @@ def download_dir(self, prefix: str, local_dir: str): def download_dir_contents(self, prefix: str, local_dir: str): util.mkdir_p(local_dir) prefix = util.ensure_suffix(prefix, "/") - for blob in self._gcs_matching_blobs_generator(prefix=prefix): + for blob in self._gcs_matching_blobs_generator(prefix=prefix, include_dir_objects=True): relative_path = util.trim_prefix(blob.name, prefix) local_dest_path = os.path.join(local_dir, relative_path) - self.download_file(blob.name, local_dest_path) + + if not local_dest_path.endswith("/"): + self.download_file(blob.name, local_dest_path) + else: + util.mkdir_p(os.path.dirname(local_dest_path)) def download(self, prefix: str, local_dir: str): """ diff --git a/pkg/cortex/serve/cortex_internal/lib/storage/s3.py b/pkg/cortex/serve/cortex_internal/lib/storage/s3.py index ac89e43f8d..24b24d34c8 100644 --- a/pkg/cortex/serve/cortex_internal/lib/storage/s3.py +++ b/pkg/cortex/serve/cortex_internal/lib/storage/s3.py @@ -218,10 +218,14 @@ def download_dir(self, prefix, local_dir): def download_dir_contents(self, prefix, local_dir): util.mkdir_p(local_dir) prefix = util.ensure_suffix(prefix, "/") - for key, _ in self._get_matching_s3_keys_generator(prefix): + for key, _ in self._get_matching_s3_keys_generator(prefix, include_dir_objects=True): rel_path = util.trim_prefix(key, prefix) local_dest_path = os.path.join(local_dir, rel_path) - self.download_file(key, local_dest_path) + + if not local_dest_path.endswith("/"): + self.download_file(key, local_dest_path) + else: + util.mkdir_p(os.path.dirname(local_dest_path)) def download_and_unzip(self, key, local_dir): util.mkdir_p(local_dir)