4444
4545DEFAULT_MIME_TYPE = "application/octet-stream"
4646
47+ FILE_REPORT_LIMIT = 1000
48+
4749
4850class S3Client (object ):
4951 """The S3Client is a wrapper of the original boto3 s3 client, which will provide
@@ -132,12 +134,14 @@ async def path_upload_handler(
132134 ):
133135 async with self .__con_sem :
134136 if not os .path .isfile (full_file_path ):
135- logger .warning ('Warning: file %s does not exist during uploading. Product: %s' ,
136- full_file_path , product )
137+ logger .warning (
138+ 'Warning: file %s does not exist during uploading. Product: %s' ,
139+ full_file_path , product
140+ )
137141 failed .append (full_file_path )
138142 return
139143
140- logger .info (
144+ logger .debug (
141145 '(%d/%d) Uploading %s to bucket %s' ,
142146 index , total , full_file_path , bucket_name
143147 )
@@ -175,16 +179,18 @@ async def path_upload_handler(
175179 if product :
176180 await self .__update_prod_info (path_key , bucket_name , [product ])
177181
178- logger .info ('Uploaded %s to bucket %s' , path , bucket_name )
182+ logger .debug ('Uploaded %s to bucket %s' , path , bucket_name )
179183 uploaded_files .append (path_key )
180184 except (ClientError , HTTPClientError ) as e :
181- logger .error ("ERROR: file %s not uploaded to bucket"
182- " %s due to error: %s " , full_file_path ,
183- bucket_name , e )
185+ logger .error (
186+ "ERROR: file %s not uploaded to bucket"
187+ " %s due to error: %s " ,
188+ full_file_path , bucket_name , e
189+ )
184190 failed .append (full_file_path )
185191 return
186192 else :
187- logger .info (
193+ logger .debug (
188194 "File %s already exists, check if need to update product." ,
189195 full_file_path ,
190196 )
@@ -193,17 +199,19 @@ async def path_upload_handler(
193199 f_meta [CHECKSUM_META_KEY ] if CHECKSUM_META_KEY in f_meta else ""
194200 )
195201 if checksum != "" and checksum .strip () != sha1 :
196- logger .error ('Error: checksum check failed. The file %s is '
197- 'different from the one in S3. Product: %s' ,
198- path_key , product )
202+ logger .error (
203+ 'Error: checksum check failed. The file %s is '
204+ 'different from the one in S3. Product: %s' ,
205+ path_key , product
206+ )
199207 failed .append (full_file_path )
200208 return
201209 (prods , no_error ) = await self .__run_async (
202210 self .__get_prod_info ,
203211 path_key , bucket_name
204212 )
205213 if not self .__dry_run and no_error and product not in prods :
206- logger .info (
214+ logger .debug (
207215 "File %s has new product, updating the product %s" ,
208216 full_file_path ,
209217 product ,
@@ -215,7 +223,9 @@ async def path_upload_handler(
215223 return
216224
217225 return (uploaded_files , self .__do_path_cut_and (
218- file_paths = file_paths , path_handler = path_upload_handler , root = root
226+ file_paths = file_paths ,
227+ path_handler = self .__path_handler_count_wrapper (path_upload_handler ),
228+ root = root
219229 ))
220230
221231 def upload_metadatas (
@@ -238,12 +248,14 @@ async def path_upload_handler(
238248 ):
239249 async with self .__con_sem :
240250 if not os .path .isfile (full_file_path ):
241- logger .warning ('Warning: file %s does not exist during uploading. Product: %s' ,
242- full_file_path , product )
251+ logger .warning (
252+ 'Warning: file %s does not exist during uploading. Product: %s' ,
253+ full_file_path , product
254+ )
243255 failed .append (full_file_path )
244256 return
245257
246- logger .info (
258+ logger .debug (
247259 '(%d/%d) Updating metadata %s to bucket %s' ,
248260 index , total , path , bucket_name
249261 )
@@ -276,9 +288,9 @@ async def path_upload_handler(
276288 )
277289 )
278290 if product :
279- # NOTE: This should not happen for most cases, as most of the metadata
280- # file does not have product info. Just leave for requirement change in
281- # future
291+ # NOTE: This should not happen for most cases, as most
292+ # of the metadata file does not have product info. Just
293+ # leave for requirement change in future
282294 (prods , no_error ) = await self .__run_async (
283295 self .__get_prod_info ,
284296 path_key , bucket_name
@@ -288,20 +300,26 @@ async def path_upload_handler(
288300 return
289301 if no_error and product not in prods :
290302 prods .append (product )
291- updated = await self .__update_prod_info (path_key , bucket_name , prods )
303+ updated = await self .__update_prod_info (
304+ path_key , bucket_name , prods
305+ )
292306 if not updated :
293307 failed .append (full_file_path )
294308 return
295- logger .info ('Updated metadata %s to bucket %s' , path , bucket_name )
309+ logger .debug ('Updated metadata %s to bucket %s' , path , bucket_name )
296310 uploaded_files .append (path_key )
297311 except (ClientError , HTTPClientError ) as e :
298- logger .error ("ERROR: file %s not uploaded to bucket"
299- " %s due to error: %s " , full_file_path ,
300- bucket_name , e )
312+ logger .error (
313+ "ERROR: file %s not uploaded to bucket"
314+ " %s due to error: %s " ,
315+ full_file_path , bucket_name , e
316+ )
301317 failed .append (full_file_path )
302318
303319 return (uploaded_files , self .__do_path_cut_and (
304- file_paths = meta_file_paths , path_handler = path_upload_handler , root = root
320+ file_paths = meta_file_paths ,
321+ path_handler = self .__path_handler_count_wrapper (path_upload_handler ),
322+ root = root
305323 ))
306324
307325 def upload_manifest (
@@ -346,7 +364,7 @@ async def path_delete_handler(
346364 total : int , failed : List [str ]
347365 ):
348366 async with self .__con_sem :
349- logger .info ('(%d/%d) Deleting %s from bucket %s' , index , total , path , bucket_name )
367+ logger .debug ('(%d/%d) Deleting %s from bucket %s' , index , total , path , bucket_name )
350368 path_key = os .path .join (key_prefix , path ) if key_prefix else path
351369 file_object = bucket .Object (path_key )
352370 existed = await self .__run_async (self .__file_exists , file_object )
@@ -368,13 +386,13 @@ async def path_delete_handler(
368386
369387 if len (prods ) > 0 :
370388 try :
371- logger .info (
389+ logger .debug (
372390 "File %s has other products overlapping,"
373391 " will remove %s from its metadata" ,
374392 path , product
375393 )
376394 await self .__update_prod_info (path_key , bucket_name , prods )
377- logger .info (
395+ logger .debug (
378396 "Removed product %s from metadata of file %s" ,
379397 product , path
380398 )
@@ -405,18 +423,25 @@ async def path_delete_handler(
405423 deleted_files .append (path )
406424 return
407425 except (ClientError , HTTPClientError ) as e :
408- logger .error ("ERROR: file %s failed to delete from bucket"
409- " %s due to error: %s " , full_file_path ,
410- bucket_name , e )
426+ logger .error (
427+ "ERROR: file %s failed to delete from bucket"
428+ " %s due to error: %s " ,
429+ full_file_path , bucket_name , e
430+ )
411431 failed .append (full_file_path )
412432 return
413433 else :
414- logger .info ("File %s does not exist in s3 bucket %s, skip deletion." ,
415- path , bucket_name )
434+ logger .debug (
435+ "File %s does not exist in s3 bucket %s, skip deletion." ,
436+ path , bucket_name
437+ )
416438 return
417439
418440 failed_files = self .__do_path_cut_and (
419- file_paths = file_paths , path_handler = path_delete_handler , root = root )
441+ file_paths = file_paths ,
442+ path_handler = self .__path_handler_count_wrapper (path_delete_handler ),
443+ root = root
444+ )
420445
421446 return (deleted_files , failed_files )
422447
@@ -594,6 +619,21 @@ async def __update_prod_info(
594619 "due to error: %s" , file , e )
595620 return False
596621
622+ def __path_handler_count_wrapper (
623+ self ,
624+ path_handler : Callable [[str , str , int , int , List [str ], asyncio .Semaphore ], Awaitable [bool ]]
625+ ) -> Callable [[str , str , int , int , List [str ], asyncio .Semaphore ], Awaitable [bool ]]:
626+ async def wrapper (
627+ full_file_path : str , path : str , index : int ,
628+ total : int , failed : List [str ]
629+ ):
630+ try :
631+ await path_handler (full_file_path , path , index , total , failed )
632+ finally :
633+ if index % FILE_REPORT_LIMIT == 0 :
634+ logger .info ("######### %d/%d files finished" , index , total )
635+ return wrapper
636+
597637 def __do_path_cut_and (
598638 self , file_paths : List [str ],
599639 path_handler : Callable [[str , str , int , int , List [str ], asyncio .Semaphore ], Awaitable [bool ]],
@@ -616,6 +656,7 @@ def __do_path_cut_and(
616656 )
617657 )
618658 index += 1
659+
619660 loop = asyncio .get_event_loop ()
620661 loop .run_until_complete (asyncio .gather (* tasks ))
621662 return failed_paths
0 commit comments