Skip to content

Fix cortex deploy panic when object and prefix present on bucket #1830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f8967a7
Fix cx panic when object and prefix present on bkt
RobertLucian Jan 27, 2021
f26030e
Misc change
RobertLucian Jan 27, 2021
d7f9356
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Jan 30, 2021
f193c94
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 3, 2021
a1c3044
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 3, 2021
0b6e072
Revert "Fix cx panic when object and prefix present on bkt"
RobertLucian Feb 3, 2021
1c632e4
Ignore objects ending with "/" for GCS
RobertLucian Feb 3, 2021
26e6018
Ignore/include empty dirs for s3/gcs
RobertLucian Feb 3, 2021
ee2fffa
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 3, 2021
65282cf
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 4, 2021
74c530e
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 5, 2021
09c8a55
Address PR comments
RobertLucian Feb 5, 2021
ef1491f
Merge branch 'master' into fix/buckets-on-deploy
deliahu Feb 5, 2021
802fcad
Fix _is_gcs_dir private method
RobertLucian Feb 5, 2021
58aeecb
Merge branch 'master' into fix/buckets-on-deploy
deliahu Feb 8, 2021
cf6e20f
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 9, 2021
a35e2b3
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 9, 2021
57ef92e
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 9, 2021
8d4dafb
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 10, 2021
e9da812
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 12, 2021
8404925
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 15, 2021
6d5c704
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 15, 2021
68220ba
Change how directories are interpreted for AWS/GCP
RobertLucian Feb 15, 2021
faf523f
Merge branch 'master' into fix/buckets-on-deploy
RobertLucian Feb 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions pkg/cortex/serve/cortex_internal/lib/storage/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,30 @@ def is_valid_gcs_path(path: str) -> bool:
def _does_blob_exist(self, prefix: str) -> bool:
return isinstance(self.gcs.get_blob(blob_name=prefix), storage.blob.Blob)

def _gcs_matching_blobs_generator(self, max_results=None, prefix="", include_dir_objects=False):
for blob in self.gcs.list_blobs(max_results, prefix=prefix):
if include_dir_objects or not blob.name.endswith("/"):
yield blob

def _is_gcs_dir(self, dir_path: str) -> bool:
prefix = util.ensure_suffix(dir_path, "/")
return len(list(self.gcs.list_blobs(max_results=2, prefix=prefix))) > 1
matching_blobs = list(
self._gcs_matching_blobs_generator(
max_results=2, prefix=prefix, include_dir_objects=True
)
)

return len(matching_blobs) > 0

def search(self, prefix: str = "") -> Tuple[List[str], List[datetime.datetime]]:
def search(
self, prefix: str = "", include_dir_objects=False
) -> Tuple[List[str], List[datetime.datetime]]:
paths = []
timestamps = []

for blob in self.gcs.list_blobs(prefix=prefix):
for blob in self._gcs_matching_blobs_generator(
prefix=prefix, include_dir_objects=include_dir_objects
):
paths.append(blob.name)
timestamps.append(blob.updated)

Expand Down Expand Up @@ -112,12 +127,14 @@ def download_dir(self, prefix: str, local_dir: str):
def download_dir_contents(self, prefix: str, local_dir: str):
util.mkdir_p(local_dir)
prefix = util.ensure_suffix(prefix, "/")
for blob in self.gcs.list_blobs(prefix=prefix):
if blob.name.endswith("/"):
continue
for blob in self._gcs_matching_blobs_generator(prefix=prefix, include_dir_objects=True):
relative_path = util.trim_prefix(blob.name, prefix)
local_dest_path = os.path.join(local_dir, relative_path)
self.download_file(blob.name, local_dest_path)

if not local_dest_path.endswith("/"):
self.download_file(blob.name, local_dest_path)
else:
util.mkdir_p(os.path.dirname(local_dest_path))

def download(self, prefix: str, local_dir: str):
"""
Expand Down
35 changes: 19 additions & 16 deletions pkg/cortex/serve/cortex_internal/lib/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _is_s3_dir(self, dir_path):
prefix = util.ensure_suffix(dir_path, "/")
return self._is_s3_prefix(prefix)

def _get_matching_s3_objects_generator(self, prefix="", suffix=""):
def _get_matching_s3_objects_generator(self, prefix="", suffix="", include_dir_objects=False):
kwargs = {"Bucket": self.bucket, "Prefix": prefix}

while True:
Expand All @@ -97,16 +97,20 @@ def _get_matching_s3_objects_generator(self, prefix="", suffix=""):

for obj in contents:
key = obj["Key"]
if key.startswith(prefix) and key.endswith(suffix):
if (
key.startswith(prefix)
and key.endswith(suffix)
and (include_dir_objects or not key.endswith("/"))
):
yield obj

try:
kwargs["ContinuationToken"] = resp["NextContinuationToken"]
except KeyError:
break

def _get_matching_s3_keys_generator(self, prefix="", suffix=""):
for obj in self._get_matching_s3_objects_generator(prefix, suffix):
def _get_matching_s3_keys_generator(self, prefix="", suffix="", include_dir_objects=False):
for obj in self._get_matching_s3_objects_generator(prefix, suffix, include_dir_objects):
yield obj["Key"], obj["LastModified"]

def put_object(self, body, key):
Expand Down Expand Up @@ -146,18 +150,15 @@ def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None):

return byte_array.strip()

def search(self, prefix="", suffix="") -> Tuple[List[str], List[datetime.datetime]]:
def search(
self, prefix="", suffix="", include_dir_objects=False
) -> Tuple[List[str], List[datetime.datetime]]:
paths = []
timestamps = []

timestamp_map = {}
for key, ts in self._get_matching_s3_keys_generator(prefix, suffix):
timestamp_map[key] = ts

filtered_keys = util.remove_non_empty_directory_paths(list(timestamp_map.keys()))
for key in filtered_keys:
for key, ts in self._get_matching_s3_keys_generator(prefix, suffix, include_dir_objects):
paths.append(key)
timestamps.append(timestamp_map[key])
timestamps.append(ts)

return paths, timestamps

Expand Down Expand Up @@ -217,12 +218,14 @@ def download_dir(self, prefix, local_dir):
def download_dir_contents(self, prefix, local_dir):
util.mkdir_p(local_dir)
prefix = util.ensure_suffix(prefix, "/")
for key, _ in self._get_matching_s3_keys_generator(prefix):
if key.endswith("/"):
continue
for key, _ in self._get_matching_s3_keys_generator(prefix, include_dir_objects=True):
rel_path = util.trim_prefix(key, prefix)
local_dest_path = os.path.join(local_dir, rel_path)
self.download_file(key, local_dest_path)

if not local_dest_path.endswith("/"):
self.download_file(key, local_dest_path)
else:
util.mkdir_p(os.path.dirname(local_dest_path))

def download_and_unzip(self, key, local_dir):
util.mkdir_p(local_dir)
Expand Down
16 changes: 10 additions & 6 deletions pkg/lib/gcp/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ func ConvertGCSObjectsToKeys(gcsObjects ...*storage.ObjectAttrs) []string {
return paths
}

func (c *Client) ListGCSDir(bucket string, gcsDir string, maxResults *int64) ([]*storage.ObjectAttrs, error) {
func (c *Client) ListGCSDir(bucket string, gcsDir string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) {
gcsDir = s.EnsureSuffix(gcsDir, "/")
return c.ListGCSPrefix(bucket, gcsDir, maxResults)
return c.ListGCSPrefix(bucket, gcsDir, includeDirObjects, maxResults)
}

func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64) ([]*storage.ObjectAttrs, error) {
func (c *Client) ListGCSPrefix(bucket string, prefix string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) {
gcsClient, err := c.GCS()
if err != nil {
return nil, err
Expand All @@ -282,7 +282,11 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64)
if attrs == nil {
continue
}
gcsObjects = append(gcsObjects, attrs)

if includeDirObjects || !strings.HasSuffix(attrs.Name, "/") {
gcsObjects = append(gcsObjects, attrs)
}

if maxResults != nil && int64(len(gcsObjects)) >= *maxResults {
break
}
Expand All @@ -291,12 +295,12 @@ func (c *Client) ListGCSPrefix(bucket string, prefix string, maxResults *int64)
return gcsObjects, nil
}

func (c *Client) ListGCSPathDir(gcsDirPath string, maxResults *int64) ([]*storage.ObjectAttrs, error) {
func (c *Client) ListGCSPathDir(gcsDirPath string, includeDirObjects bool, maxResults *int64) ([]*storage.ObjectAttrs, error) {
bucket, gcsDir, err := SplitGCSPath(gcsDirPath)
if err != nil {
return nil, err
}
return c.ListGCSDir(bucket, gcsDir, maxResults)
return c.ListGCSDir(bucket, gcsDir, includeDirObjects, maxResults)
}

// This behaves like you'd expect `ls` to behave on a local file system
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/config/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func ListBucketPrefix(prefix string, maxResults *int64) ([]*storage.ObjectAttrs,
}
return nil, s3Objects, nil
case types.GCPProviderType:
gcsObjects, err := GCP.ListGCSPrefix(GCPCoreConfig.Bucket, prefix, maxResults)
gcsObjects, err := GCP.ListGCSPrefix(GCPCoreConfig.Bucket, prefix, false, maxResults)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/types/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func validateDirModels(
return nil, err
}

gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil)
gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, false, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func validateModels(
}
modelPrefix = s.EnsureSuffix(modelPrefix, "/")

gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, nil)
gcsObjects, err := gcpClient.ListGCSPathDir(modelPath, false, nil)
if err != nil {
return nil, errors.Wrap(err, modelNameWrapStr)
}
Expand Down