使用Python和Boto3高效统计AWS S3存储桶中特定文件数量

使用Python和Boto3高效统计AWS S3存储桶中特定文件数量

本教程详细介绍了如何使用python和Boto3库高效统计AWS S3存储桶中符合特定命名模式的文件数量。文章重点讲解了Boto3客户端与资源对象的选择、Prefix参数的正确使用、以及如何处理S3对象列表的自动分页,并提供了实用的代码示例,帮助用户精确筛选和统计S3文件。

理解S3对象列表与Boto3

在AWS S3中,数据以对象(Object)的形式存储,每个对象都有一个唯一的键(Key),它包含了对象的完整路径和文件名。S3并没有传统文件系统中的“文件夹”概念,而是通过对象键的层级结构(例如folder1/folder2/file.txt)来模拟目录。使用Boto3库与S3交互时,选择正确的接口和理解参数至关重要。

Boto3 Client vs. Resource

Boto3提供了两种主要的接口来与AWS服务交互:client和resource。

  • client (低级接口):提供与AWS API一对一的映射,功能更原始,需要手动处理分页、错误等。例如,s3.client(‘s3’).list_objects_v2()。
  • resource (高级接口):在client之上封装了一层更高级、更Pythonic的抽象,通常更易于使用。它会自动处理分页,并提供更直观的对象模型。例如,s3.resource(‘s3’).Bucket(‘your-bucket’).objects.Filter()。

对于需要遍历大量S3对象并进行筛选的场景,推荐使用boto3.resource,因为它能自动处理S3 API的1000个对象限制(即分页),避免手动管理ContinuationToken。

Prefix参数的作用

在S3的list_objects_v2或objects.filter()方法中,Prefix参数用于指定一个字符串,只有对象键以此字符串开头的对象才会被返回。这是模拟“遍历文件夹”的关键。

立即学习Python免费学习笔记(深入)”;

重要提示:Prefix参数应是S3桶内路径的一部分,不应包含S3 URL的协议头(s3://)或桶名。例如,如果S3 URL是s3://my-bucket/path/to/folder/,那么桶名是my-bucket,Prefix应该是path/to/folder/。如果Prefix被错误地设置为完整的S3 URL,将无法匹配到任何对象,导致计数为零。

实现S3文件统计逻辑

为了高效且准确地统计S3中特定文件(例如,名称以file_开头且以.ts结尾的视频分块文件),我们需要一个函数来执行以下步骤:

  1. 从给定的S3 URL中解析出桶名和前缀。
  2. 使用Boto3 Resource获取指定前缀下的所有对象。
  3. 遍历这些对象,并根据其键(object.key)进行精确的模式匹配。

1. 解析S3 URL为桶名和前缀

S3 URL通常格式为s3:///

。我们需要编写一个辅助函数来提取这些信息。</p><pre class="brush:php;toolbar:false">import re def parse_s3_url(s3_url): """ 从S3 URL中解析出桶名和前缀。 例如:’s3://coursevideotesting/Financial_Freedom_Course_Kannada/00_Course_Trailer_New_update/360p/’ 返回:(‘coursevideotesting’, ‘Financial_Freedom_Course_Kannada/00_Course_Trailer_New_update/360p/’) """ match = re.match(r’s3://([^/]+)/(.*)’, s3_url) if match: bucket_name = match.group(1) # 确保前缀以斜杠结尾,如果不是文件而是目录 prefix = match.group(2) if prefix and not prefix.endswith(‘/’): prefix += ‘/’ return bucket_name, prefix else: raise ValueError(f"Invalid S3 URL format: {s3_url}") # 示例 # bucket, prefix = parse_s3_url(‘s3://my-bucket/path/to/folder/’) # print(f"Bucket: {bucket}, Prefix: {prefix}")

2. 构建文件统计函数

现在,我们可以创建一个函数来统计指定S3路径下符合条件的文件。这里我们将统计所有键以prefix开头,且包含file_并以.ts结尾的文件。

import boto3  def count_specific_files_in_s3(bucket_name, prefix, file_pattern_start='file_', file_pattern_end='.ts'):     """     统计S3桶中指定前缀下,符合特定命名模式的文件数量。      Args:         bucket_name (str): S3桶的名称。         prefix (str): 要搜索的前缀路径(例如 'path/to/folder/')。         file_pattern_start (str): 文件名必须以此字符串开头(例如 'file_')。         file_pattern_end (str): 文件名必须以此字符串结尾(例如 '.ts')。      Returns:         int: 符合条件的文件数量。     """     s3_resource = boto3.resource('s3')     bucket = s3_resource.Bucket(bucket_name)      count = 0     print(f"Checking folder: s3://{bucket_name}/{prefix}")      try:         # 使用filter(Prefix=prefix)获取指定路径下的所有对象         # boto3 resource会自动处理分页         for obj in bucket.objects.filter(Prefix=prefix):             # object.key 是对象的完整键,例如 'path/to/folder/file_001.ts'             # 确保我们只计算当前“目录”下的文件,而不是子目录中的文件             # 并且匹配特定的文件名模式              # 假设prefix是 'path/to/folder/'             # 那么文件键应该是 'path/to/folder/file_XXX.ts'             # 我们需要检查文件键是否以完整的prefix开头,并且在prefix之后的部分符合我们的文件名模式              # 获取对象键中,去除prefix后的相对路径/文件名             relative_key = obj.key[len(prefix):] if obj.key.startswith(prefix) else obj.key              # 排除自身作为目录的情况 (例如 prefix是'foo/',obj.key是'foo/')             # 排除子目录 (例如 prefix是'foo/',obj.key是'foo/bar/file.ts')             # 只统计当前层级的文件             if '/' not in relative_key and                 relative_key.startswith(file_pattern_start) and                 relative_key.endswith(file_pattern_end):                 count += 1                 # print(f"Matched file: {obj.key}") # 调试用         print(f"Actual chunks found: {count}")         return count     except Exception as e:         print(f"Error Accessing S3 bucket {bucket_name} with prefix {prefix}: {e}")         return 0 # 返回0表示发生错误或无法访问  # 示例用法 (假设你的AWS凭证已配置) # bucket_name = 'coursevideotesting' # prefix = 'Financial_Freedom_Course_Kannada/00_Course_Trailer_New_update/360p/' # actual_count = count_specific_files_in_s3(bucket_name, prefix) # print(f"Count for {prefix}: {actual_count}")

3. 整合CSV处理流程

现在,我们将上述功能集成到读取输入CSV、处理S3 URL并写入结果CSV的完整脚本中。

import csv import boto3 import re  # 1. 解析S3 URL的辅助函数 def parse_s3_url(s3_url):     match = re.match(r's3://([^/]+)/(.*)', s3_url)     if match:         bucket_name = match.group(1)         prefix = match.group(2)         # 确保前缀以斜杠结尾,如果它代表一个目录         if prefix and not prefix.endswith('/'):             prefix += '/'         return bucket_name, prefix     else:         raise ValueError(f"Invalid S3 URL format: {s3_url}")  # 2. 统计S3中特定文件的函数 def count_specific_files_in_s3(bucket_name, prefix, file_pattern_start='file_', file_pattern_end='.ts'):     s3_resource = boto3.resource('s3')     bucket = s3_resource.Bucket(bucket_name)      count = 0     print(f"Checking folder: s3://{bucket_name}/{prefix}")      try:         for obj in bucket.objects.filter(Prefix=prefix):             # object.key 是完整的对象键,例如 'path/to/folder/file_001.ts'             # 我们需要确保只统计当前层级的文件,而不是子目录中的文件             # 例如,如果prefix是'videos/qualityA/'             # obj.key可能是 'videos/qualityA/file_001.ts' (要统计)             # 或 'videos/qualityA/subfolder/file_001.ts' (不统计)              # 提取对象键中,去除prefix后的相对路径/文件名             # 如果obj.key不以prefix开头,则跳过(理论上不会发生,因为filter已经处理了)             if not obj.key.startswith(prefix):                 continue              relative_key = obj.key[len(prefix):]              # 检查relative_key是否包含斜杠,如果有,说明是子目录或子目录中的文件,跳过             # 并且检查文件名是否符合模式             if '/' not in relative_key and                 relative_key.startswith(file_pattern_start) and                 relative_key.endswith(file_pattern_end):                 count += 1         print(f"Actual chunks found: {count}")         return count     except Exception as e:         print(f"Error accessing S3 bucket {bucket_name} with prefix {prefix}: {e}")         return 0 # 发生错误时返回0  # --- 主脚本逻辑 --- # 输入和输出csv文件名称 input_csv_file = 'ldt_ffw_course_videos_temp.csv'  # 替换为你的输入CSV文件 output_csv_file = 'file_count_result.csv'          # 替换为你的输出CSV文件  # 确保输入CSV文件存在并包含正确的列名 # 假设输入CSV包含 'course_video_s3_url' 和 'course_video_ts_file_cnt' 列  # 读取URLs从输入CSV并检查文件数量 with open(input_csv_file, mode='r', encoding='utf-8') as infile,       open(output_csv_file, mode='w', newline='', encoding='utf-8') as outfile:      reader = csv.DictReader(infile)     # 检查CSV列名是否存在     if 'course_video_s3_url' not in reader.fieldnames or         'course_video_ts_file_cnt' not in reader.fieldnames:         raise ValueError(f"Input CSV must contain 'course_video_s3_url' and 'course_video_ts_file_cnt' columns. Found: {reader.fieldnames}")      fieldnames = ['URL', 'Actual Files', 'Expected Files']     writer = csv.DictWriter(outfile, fieldnames=fieldnames)     writer.writeheader()      for row in reader:         s3_url_from_csv = row['course_video_s3_url']         expected_files = int(row['course_video_ts_file_cnt'])          try:             # 解析S3 URL             bucket_name, s3_prefix = parse_s3_url(s3_url_from_csv)              # 调用函数统计文件             actual_files = count_specific_files_in_s3(bucket_name, s3_prefix)              writer.writerow({                 'URL': s3_url_from_csv,                 'Actual Files': actual_files,                 'Expected Files': expected_files             })         except ValueError as ve:             print(f"Skipping row due to invalid S3 URL: {s3_url_from_csv} - {ve}")             writer.writerow({                 'URL': s3_url_from_csv,                 'Actual Files': 'Error: Invalid URL',                 'Expected Files': expected_files             })         except Exception as e:             print(f"An unexpected error occurred for URL {s3_url_from_csv}: {e}")             writer.writerow({                 'URL': s3_url_from_csv,                 'Actual Files': 'Error: ' + str(e),                 'Expected Files': expected_files             })  print(f"File counting complete. Results written to {output_csv_file}")

注意事项与最佳实践

  1. AWS 凭证配置: 确保您的运行环境已正确配置AWS凭证(例如,通过环境变量、~/.aws/credentials文件或IAM角色)。Boto3会自动查找这些凭证。
  2. IAM 权限: 用于执行脚本的IAM用户或角色必须拥有对目标S3桶的s3:ListBucket权限,以便能够列出桶中的对象。如果需要读取对象内容,还需要s3:GetObject权限,但在此计数场景中不需要。
  3. Prefix 的精确性: Prefix参数越精确,S3返回的对象数量就越少,从而提高效率。如果您的S3路径结构允许,尽量使用最长的公共前缀。
  4. 文件命名模式: count_specific_files_in_s3函数中的file_pattern_start和file_pattern_end可以根据实际需求调整。对于更复杂的文件名匹配,可以考虑使用Python的re模块(正则表达式)。
  5. 错误处理: 在实际生产环境中,应加入更健壮的错误处理机制,例如重试逻辑、详细日志记录等。
  6. 性能考量: 尽管boto3.resource会自动处理分页,但对于包含数百万甚至数十亿个对象的超大型桶,遍历所有对象仍可能耗时。如果仅需统计特定前缀下的文件,确保Prefix设置得尽可能具体。

总结

通过本教程,您应该掌握了如何使用Python和Boto3库高效地统计AWS S3存储桶中特定类型的文件。关键在于理解boto3.resource的优势、正确使用Prefix参数来过滤S3对象,以及在Python代码中对object.key进行精确的字符串匹配。这种方法不仅能避免手动分页的复杂性,还能确保在处理大量S3对象时获得准确的统计结果。

© 版权声明
THE END
喜欢就支持一下吧
点赞7 分享