16
16
import concurrent .futures
17
17
import logging
18
18
import math
19
+ import multiprocessing
19
20
import os
20
21
import re
21
22
import shutil
22
23
import threading
23
24
from dataclasses import dataclass , field
25
+ from pathlib import Path
24
26
from queue import Queue
25
27
from typing import Any , Callable , Dict , Optional
26
28
@@ -92,6 +94,26 @@ def __init__(
92
94
self .chunk_bytes = chunk_bytes
93
95
self .job_queues = Queue ()
94
96
self ._lock = threading .Lock ()
97
+ self .chunk_download_path = self .get_chunk_download_path (download_path )
98
+
99
+ def get_chunk_download_path (self , path : str ) -> str :
100
+ """Get the path where chunks will be downloaded"""
101
+
102
+ # make the folder name from the model name and file to be downloaded
103
+ stub = path .split (os .path .sep )[- 3 ]
104
+ path = "_" .join (path .split (os .path .sep )[- 2 :])
105
+ file_name_as_folder = path .replace ("." , "_" )
106
+
107
+ # save the chunks on a different folder than the root model folder
108
+ return os .path .join (
109
+ str (Path .home ()),
110
+ ".cache" ,
111
+ "sparsezoo" ,
112
+ "neuralmagic" ,
113
+ "chunks" ,
114
+ stub ,
115
+ file_name_as_folder ,
116
+ )
95
117
96
118
def is_range_header_supported (self ) -> bool :
97
119
"""Check if chunck download is supported"""
@@ -148,9 +170,11 @@ def queue_chunk_download_jobs(self) -> None:
148
170
The jobs need to be executed by a worker or scheduler that processes the
149
171
queued JobQueues.
150
172
"""
151
- download_jobs : Queue = JobQueue (description = "Downloading Chunks" )
173
+ file_name = self .download_path .split (os .path .sep )[- 1 ]
174
+ download_jobs : Queue = JobQueue (
175
+ description = f"Downloading Chunks for { file_name } "
176
+ )
152
177
num_download_jobs = math .ceil (self .file_size / self .chunk_bytes )
153
-
154
178
for job_id in range (num_download_jobs ):
155
179
start_byte = 0 if job_id == 0 else job_id * (self .chunk_bytes ) + 1
156
180
end_byte = (
@@ -161,8 +185,10 @@ def queue_chunk_download_jobs(self) -> None:
161
185
bytes_range = f"bytes={ start_byte } -{ end_byte } "
162
186
163
187
func_kwargs = {
164
- "download_path" : self .get_chunk_file_path (
165
- f"{ job_id :05d} _{ bytes_range } "
188
+ "download_path" : (
189
+ os .path .join (
190
+ self .chunk_download_path , f"{ job_id :05d} _{ bytes_range } "
191
+ )
166
192
),
167
193
"headers" : {
168
194
"Range" : bytes_range ,
@@ -237,7 +263,7 @@ def queue_jobs(self) -> None:
237
263
)
238
264
self .job_queues .put (job_queue )
239
265
240
- def run (self , num_threads : int = 10 ) -> None :
266
+ def run (self , num_threads : int = 1 ) -> None :
241
267
"""
242
268
Executes queued download jobs in parallel using multiple threads.
243
269
@@ -250,6 +276,9 @@ def run(self, num_threads: int = 10) -> None:
250
276
file chunks in parallel. Defaults to 10.
251
277
252
278
"""
279
+ available_threads = multiprocessing .cpu_count () - threading .active_count ()
280
+ num_threads = max (available_threads // 2 , num_threads )
281
+
253
282
is_prev_job_queue_success = True
254
283
while not self .job_queues .empty () and is_prev_job_queue_success :
255
284
job_queue = self .job_queues .get ()
@@ -295,23 +324,25 @@ def execute_job_from_queue(self, job_queue: Queue, **kwargs) -> None:
295
324
with self ._lock :
296
325
job : Job = job_queue .get ()
297
326
success = False
327
+ err = ""
298
328
while not success and job .retries < job .max_retries :
299
329
try :
300
330
job .func (** job .func_kwargs , ** kwargs )
301
331
success = True
302
332
except Exception as _err :
333
+ err = _err
303
334
_LOGGER .debug (
304
335
f"{ job .retries / self .max_retries } : "
305
336
"Failed running {self.func} with kwargs {job.func_kwargs}"
306
337
)
307
- _LOGGER .debug (_err )
338
+ _LOGGER .error (_err )
308
339
job .retries += 1
309
340
if job .retries < job .max_retries :
310
341
job_queue .put (job )
311
342
312
343
if not success :
313
344
_LOGGER .debug (f"Chunk download failed after { self .max_retries } retries." )
314
- raise ValueError
345
+ raise ValueError ( err )
315
346
316
347
def download_file (
317
348
self ,
@@ -339,7 +370,10 @@ def download_file(
339
370
340
371
"""
341
372
write_chunk_size = min (CHUNK_BYTES , self .file_size )
373
+ _LOGGER .debug ("creating " , download_path )
374
+
342
375
create_parent_dirs (download_path )
376
+
343
377
response = requests .get (
344
378
self .url , headers = headers , stream = True , allow_redirects = True
345
379
)
@@ -358,11 +392,10 @@ def combine_chunks_and_delete(self, download_path: str, progress_bar: tqdm) -> N
358
392
:param progress_bar: tqdm object showing the progress of combining chunks
359
393
360
394
"""
361
- parent_directory = os .path .dirname (download_path )
362
- chunk_directory = os .path .join (parent_directory , "chunks" )
395
+ _LOGGER .debug ("Combing and deleting " , self .chunk_download_path )
363
396
364
397
pattern = re .compile (r"\d+_bytes=" )
365
- files = os .listdir (chunk_directory )
398
+ files = os .listdir (self . chunk_download_path )
366
399
367
400
chunk_files = [chunk_file for chunk_file in files if pattern .match (chunk_file )]
368
401
@@ -371,13 +404,13 @@ def combine_chunks_and_delete(self, download_path: str, progress_bar: tqdm) -> N
371
404
create_parent_dirs (self .download_path )
372
405
with open (self .download_path , "wb" ) as combined_file :
373
406
for file_path in sorted_chunk_files :
374
- chunk_path = os .path .join (chunk_directory , file_path )
407
+ chunk_path = os .path .join (self . chunk_download_path , file_path )
375
408
with open (chunk_path , "rb" ) as infile :
376
409
data = infile .read ()
377
410
combined_file .write (data )
378
411
progress_bar .update (len (data ))
379
412
380
- shutil .rmtree (chunk_directory )
413
+ shutil .rmtree (self . chunk_download_path )
381
414
382
415
def get_chunk_file_path (self , file_range : str ) -> str :
383
416
"""
0 commit comments