Skip to content

Commit f76eb97

Browse files
committed
refactor: 把 tempfile 修改成了 io.BytesIO
1 parent 4d19754 commit f76eb97

File tree

11 files changed

+68
-73
lines changed

11 files changed

+68
-73
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ config
77
cache
88
database/*.db
99
cluster_status
10+
tianxiu2b2t
1011

1112
**/__pycache__
1213

core/cluster.py

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import contextlib
22
import datetime
33
import hmac
4+
import io
45
import json
56
import os
67
from pathlib import Path
@@ -577,44 +578,40 @@ async def _download_file(
577578
for _ in range(10):
578579
size = 0
579580
hash = utils.get_hash_obj(file.hash)
580-
with tempfile.NamedTemporaryFile(
581-
dir=self._cache_dir,
582-
delete=False
583-
) as tmp_file:
584-
try:
585-
async with session.get(
586-
file.path
587-
) as resp:
588-
while (data := await resp.content.read(1024 * 1024 * 16)):
589-
tmp_file.write(data)
590-
hash.update(data)
591-
inc = len(data)
592-
size += inc
593-
self._pbar.update(inc)
594-
pbar.update(inc)
595-
if hash.hexdigest() != file.hash or size != file.size:
596-
await anyio.sleep(50)
597-
raise Exception(f"hash mismatch, got {hash.hexdigest()} expected {file.hash}")
598-
await self.upload_storage(file, tmp_file, size)
599-
self.update_success()
600-
except Exception as e:
601-
last_error = e
602-
self._pbar.update(-size)
603-
pbar.update(-size)
604-
self.update_failed()
605-
continue
606-
finally:
607-
tmp_file.close()
608-
os.remove(tmp_file.name)
609-
return None
581+
tmp_file = io.BytesIO()
582+
try:
583+
async with session.get(
584+
file.path
585+
) as resp:
586+
while (data := await resp.content.read(1024 * 1024 * 16)):
587+
tmp_file.write(data)
588+
hash.update(data)
589+
inc = len(data)
590+
size += inc
591+
self._pbar.update(inc)
592+
pbar.update(inc)
593+
if hash.hexdigest() != file.hash or size != file.size:
594+
await anyio.sleep(50)
595+
raise Exception(f"hash mismatch, got {hash.hexdigest()} expected {file.hash}")
596+
await self.upload_storage(file, tmp_file, size)
597+
self.update_success()
598+
except Exception as e:
599+
last_error = e
600+
self._pbar.update(-size)
601+
pbar.update(-size)
602+
self.update_failed()
603+
continue
604+
finally:
605+
tmp_file.close()
606+
return None
610607
if last_error is not None:
611608
raise last_error
612609

613610

614611
async def upload_storage(
615612
self,
616613
file: BMCLAPIFile,
617-
tmp_file: 'tempfile._TemporaryFileWrapper',
614+
data: io.BytesIO,
618615
size: int
619616
):
620617
missing_storage = [
@@ -623,11 +620,11 @@ async def upload_storage(
623620
if len(missing_storage) == 0:
624621
return
625622
for storage in missing_storage:
626-
tmp_file.seek(0)
623+
data.seek(0)
627624
retries = 0
628625
while 1:
629626
try:
630-
await storage.storage.upload_download_file(f"{file.hash[:2]}/{file.hash}", tmp_file, size)
627+
await storage.storage.upload_download_file(f"{file.hash[:2]}/{file.hash}", data, size)
631628
break
632629
except:
633630
if retries >= 10:

core/dashboard.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ async def setup(
101101
app: fastapi.FastAPI,
102102
task_group: anyio.abc.TaskGroup,
103103
):
104-
if not DEBUG:
105-
return
104+
#if not DEBUG:
105+
# return
106106

107107
systeminfo.setup()
108108

core/logger.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,16 @@ def emit(self, record):
114114

115115
# 配置拦截处理器
116116
#logging.basicConfig(handlers=[InterceptHandler()], level=logging.DEBUG)
117-
#logging.getLogger("uvicorn").handlers = [InterceptHandler()]
118-
#logging.getLogger("uvicorn.access").handlers = [InterceptHandler()]
119-
#logging.getLogger("uvicorn.error").handlers = [InterceptHandler()]
120-
#logging.getLogger("engineio.client").handlers = [DebugHandler()]
121-
#logging.getLogger("socketio.client").handlers = [DebugHandler()]
117+
logging.getLogger("uvicorn").handlers = [InterceptHandler()]
118+
logging.getLogger("uvicorn.access").handlers = [InterceptHandler()]
119+
logging.getLogger("uvicorn.error").handlers = [InterceptHandler()]
120+
logging.getLogger("engineio.client").handlers.append(InterceptHandler())
121+
logging.getLogger("socketio.client").handlers.append(InterceptHandler())
122122
logging.basicConfig(
123-
filename="./logs/logging.log"
123+
#format=LOGGER_FORMAT,
124+
handlers=[InterceptHandler()],
125+
#filename="./logs/logging.log",
126+
level=logging.DEBUG
124127
)
125128

126129

core/storage/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
"local": LocalStorage,
2222
"alist": AlistStorage,
2323
"webdav": WebDavStorage,
24-
"s3": S3Storage,
24+
#"s3": S3Storage,
2525
"minio": MinioStorage
2626
}
2727

core/storage/abc.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import tempfile
33

44
import anyio.abc
5+
import io
56

67
from core import utils
78
from core.abc import BMCLAPIFile, ResponseFile, ResponseFileNotFound, ResponseFileMemory, ResponseFileLocal, ResponseFileRemote
@@ -151,15 +152,15 @@ async def works(root_ids: list[int]):
151152
async def upload(
152153
self,
153154
path: str,
154-
tmp_file: tempfile._TemporaryFileWrapper,
155+
data: io.BytesIO,
155156
size: int
156157
):
157158
raise NotImplementedError
158159

159-
async def upload_download_file(self, path: str, tmp_file: tempfile._TemporaryFileWrapper, size: int):
160+
async def upload_download_file(self, path: str, data: io.BytesIO, size: int):
160161
if self.download_dir:
161162
path = f"download/{path}"
162-
await self.upload(f"download/{path}", tmp_file, size)
163+
await self.upload(f"download/{path}", data, size)
163164

164165
async def get_response_file(
165166
self,
@@ -197,15 +198,12 @@ async def write_measure(self, size: int):
197198
path = f"measure/{size}"
198199
size = size * 1024 * 1024
199200

200-
with tempfile.NamedTemporaryFile() as tmp:
201-
tmp.write(b'\x00' * size)
202-
tmp.seek(0)
203-
await self.upload(
204-
path,
205-
tmp,
206-
size
207-
)
208-
logger.tsuccess("storage.write_measure", size=int(size / (1024 * 1024)), name=self.name, type=self.type)
201+
await self.upload(
202+
path,
203+
io.BytesIO(b"\x00" * size),
204+
size
205+
)
206+
logger.tsuccess("storage.write_measure", size=int(size / (1024 * 1024)), name=self.name, type=self.type)
209207

210208

211209
def get_py_check_path(self) -> 'CPath':

core/storage/alist.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import io
12
from tempfile import _TemporaryFileWrapper
23
import time
34
from typing import Any
@@ -167,7 +168,7 @@ async def list_files(self, path: str) -> list[abc.FileInfo]:
167168
))
168169
return res
169170

170-
async def upload(self, path: str, tmp_file: _TemporaryFileWrapper, size: int):
171+
async def upload(self, path: str, data: io.BytesIO, size: int):
171172
async with aiohttp.ClientSession(
172173
base_url=self._endpoint,
173174
headers={
@@ -180,10 +181,10 @@ async def upload(self, path: str, tmp_file: _TemporaryFileWrapper, size: int):
180181
headers={
181182
"File-Path": urlparse.quote(str(self._path / path)),
182183
},
183-
data=tmp_file.file
184+
data=data.getbuffer()
184185
) as resp:
185-
data = AlistResponse(await resp.json())
186-
data.raise_for_status()
186+
alist_resp = AlistResponse(await resp.json())
187+
alist_resp.raise_for_status()
187188
return True
188189

189190
async def get_file(self, path: str) -> abc.ResponseFile:

core/storage/local.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import os
21
from pathlib import Path
3-
import tempfile
2+
import io
43
import time
54
import anyio.abc
65

@@ -54,16 +53,13 @@ async def list_files(
5453
async def upload(
5554
self,
5655
path: str,
57-
tmp_file: tempfile._TemporaryFileWrapper,
56+
data: io.BytesIO,
5857
size: int
5958
):
6059
root = Path(str(self.path)) / path
6160
root.parent.mkdir(parents=True, exist_ok=True)
6261
with open(root, "wb") as f:
63-
while (data := tmp_file.read(65536)):
64-
if not data:
65-
break
66-
f.write(data)
62+
f.write(data.getbuffer())
6763
return True
6864

6965
async def _check(

core/storage/minio.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,15 @@ async def list_files(
9494
async def upload(
9595
self,
9696
path: str,
97-
tmp_file: tempfile._TemporaryFileWrapper,
97+
data: io.BytesIO,
9898
size: int
9999
):
100100
root = self.path / path
101101

102102
await self.minio.put_object(
103103
self.bucket,
104104
str(root)[1:],
105-
tmp_file,
105+
data.getbuffer(),
106106
size
107107
)
108108
return True

core/storage/s3.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from io import BytesIO
2+
import io
23
import tempfile
34
import time
45
import aioboto3.session
@@ -137,7 +138,7 @@ async def list_files(
137138
async def upload(
138139
self,
139140
path: str,
140-
tmp_file: tempfile._TemporaryFileWrapper,
141+
data: io.BytesIO,
141142
size: int
142143
):
143144
async with self.session.resource(
@@ -149,7 +150,7 @@ async def upload(
149150
) as resource:
150151
bucket = await resource.Bucket(self.bucket)
151152
obj = await bucket.Object(str(self.path / path))
152-
await obj.upload_fileobj(tmp_file)
153+
await obj.upload_fileobj(data)
153154
return True
154155

155156

0 commit comments

Comments
 (0)