Skip to content

Commit 043c610

Browse files
committed
Add retries to get_json(), use in get_resource_status() (#286)
(cherry picked from commit d4bbfd2)
1 parent b0d7a7f commit 043c610

File tree

3 files changed

+81
-19
lines changed

3 files changed

+81
-19
lines changed

pkg/workloads/cortex/lib/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ def model_config(self, model_name):
371371

372372
def get_resource_status(self, resource):
373373
key = self.resource_status_key(resource)
374-
return self.storage.get_json(key)
374+
return self.storage.get_json(key, num_retries=5)
375375

376376
def upload_resource_status_start(self, *resources):
377377
timestamp = util.now_timestamp_rfc_3339()

pkg/workloads/cortex/lib/storage/local.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import msgpack
2222
from pathlib import Path
2323
import shutil
24+
import time
2425

2526
from cortex.lib import util
2627
from cortex.lib.exceptions import CortexException
@@ -38,7 +39,17 @@ def _get_or_create_path(self, key):
3839
p.parent.mkdir(parents=True, exist_ok=True)
3940
return p
4041

41-
def _get_path_if_exists(self, key, allow_missing=False):
42+
def _get_path_if_exists(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
43+
while True:
44+
try:
45+
return self._get_path_if_exists_single(key, allow_missing=allow_missing)
46+
except:
47+
if num_retries <= 0:
48+
raise
49+
num_retries -= 1
50+
time.sleep(retry_delay_sec)
51+
52+
def _get_path_if_exists_single(self, key, allow_missing=False):
4253
p = Path(os.path.join(self.base_dir, key))
4354
if not p.exists() and allow_missing:
4455
return None
@@ -69,8 +80,13 @@ def put_json(self, obj, key):
6980
f = self._get_or_create_path(key)
7081
f.write_text(json.dumps(obj))
7182

72-
def get_json(self, key, allow_missing=False):
73-
f = self._get_path_if_exists(key, allow_missing)
83+
def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
84+
f = self._get_path_if_exists(
85+
key,
86+
allow_missing=allow_missing,
87+
num_retries=num_retries,
88+
retry_delay_sec=retry_delay_sec,
89+
)
7490
if f is None:
7591
return None
7692
return json.loads(f.read_text())
@@ -79,8 +95,13 @@ def put_msgpack(self, obj, key):
7995
f = self._get_or_create_path(key)
8096
f.write_bytes(msgpack.dumps(obj))
8197

82-
def get_msgpack(self, key, allow_missing=False):
83-
f = self._get_path_if_exists(key, allow_missing)
98+
def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
99+
f = self._get_path_if_exists(
100+
key,
101+
allow_missing=allow_missing,
102+
num_retries=num_retries,
103+
retry_delay_sec=retry_delay_sec,
104+
)
84105
if f is None:
85106
return None
86107
return msgpack.loads(f.read_bytes())
@@ -89,8 +110,13 @@ def put_pyobj(self, obj, key):
89110
f = self._get_or_create_path(key)
90111
f.write_bytes(pickle.dumps(obj))
91112

92-
def get_pyobj(self, key, bucket, allow_missing=False):
93-
f = self._get_path_if_exists(key, allow_missing)
113+
def get_pyobj(self, key, bucket, allow_missing=False, num_retries=0, retry_delay_sec=2):
114+
f = self._get_path_if_exists(
115+
key,
116+
allow_missing=allow_missing,
117+
num_retries=num_retries,
118+
retry_delay_sec=retry_delay_sec,
119+
)
94120
if f is None:
95121
return None
96122
return pickle.loads(f.read_bytes())

pkg/workloads/cortex/lib/storage/s3.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import pickle
2020
import json
2121
import msgpack
22+
import time
2223

2324
from cortex.lib import util
2425
from cortex.lib.exceptions import CortexException
@@ -114,18 +115,32 @@ def _get_matching_s3_keys_generator(self, prefix="", suffix=""):
114115
def _upload_string_to_s3(self, string, key):
115116
self.s3.put_object(Bucket=self.bucket, Key=key, Body=string)
116117

117-
def _read_bytes_from_s3(self, key, allow_missing=False, ext_bucket=None):
118+
def _read_bytes_from_s3(
119+
self, key, allow_missing=False, ext_bucket=None, num_retries=0, retry_delay_sec=2
120+
):
121+
while True:
122+
try:
123+
return self._read_bytes_from_s3_single(
124+
key, allow_missing=allow_missing, ext_bucket=ext_bucket
125+
)
126+
except:
127+
if num_retries <= 0:
128+
raise
129+
num_retries -= 1
130+
time.sleep(retry_delay_sec)
131+
132+
def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None):
118133
bucket = self.bucket
119134
if ext_bucket is not None:
120135
bucket = ext_bucket
121136

122137
try:
123138
try:
124139
byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read()
125-
except self.s3.exceptions.NoSuchKey as e:
140+
except self.s3.exceptions.NoSuchKey:
126141
if allow_missing:
127142
return None
128-
raise e
143+
raise
129144
except Exception as e:
130145
raise CortexException(
131146
'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket)
@@ -140,26 +155,41 @@ def search(self, prefix="", suffix=""):
140155
def put_json(self, obj, key):
141156
self._upload_string_to_s3(json.dumps(obj), key)
142157

143-
def get_json(self, key, allow_missing=False):
144-
obj = self._read_bytes_from_s3(key, allow_missing)
158+
def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
159+
obj = self._read_bytes_from_s3(
160+
key,
161+
allow_missing=allow_missing,
162+
num_retries=num_retries,
163+
retry_delay_sec=retry_delay_sec,
164+
)
145165
if obj is None:
146166
return None
147167
return json.loads(obj.decode("utf-8"))
148168

149169
def put_msgpack(self, obj, key):
150170
self._upload_string_to_s3(msgpack.dumps(obj), key)
151171

152-
def get_msgpack(self, key, allow_missing=False):
153-
obj = self._read_bytes_from_s3(key, allow_missing)
172+
def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
173+
obj = self._read_bytes_from_s3(
174+
key,
175+
allow_missing=allow_missing,
176+
num_retries=num_retries,
177+
retry_delay_sec=retry_delay_sec,
178+
)
154179
if obj == None:
155180
return None
156181
return msgpack.loads(obj, raw=False)
157182

158183
def put_pyobj(self, obj, key):
159184
self._upload_string_to_s3(pickle.dumps(obj), key)
160185

161-
def get_pyobj(self, key, allow_missing=False):
162-
obj = self._read_bytes_from_s3(key, allow_missing)
186+
def get_pyobj(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
187+
obj = self._read_bytes_from_s3(
188+
key,
189+
allow_missing=allow_missing,
190+
num_retries=num_retries,
191+
retry_delay_sec=retry_delay_sec,
192+
)
163193
if obj is None:
164194
return None
165195
return pickle.loads(obj)
@@ -207,9 +237,15 @@ def download_file_external(self, s3_path, local_path):
207237
+ "it may not exist, or you may not have suffienct permissions"
208238
) from e
209239

210-
def get_json_external(self, s3_path):
240+
def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2):
211241
bucket, key = self.deconstruct_s3_path(s3_path)
212-
obj = self._read_bytes_from_s3(key, ext_bucket=bucket)
242+
obj = self._read_bytes_from_s3(
243+
key,
244+
allow_missing=False,
245+
ext_bucket=bucket,
246+
num_retries=num_retries,
247+
retry_delay_sec=retry_delay_sec,
248+
)
213249
if obj is None:
214250
return None
215251
return json.loads(obj.decode("utf-8"))

0 commit comments

Comments
 (0)