本教程详细介绍了如何使用python和Boto3库高效统计AWS S3存储桶中符合特定命名模式的文件数量。文章重点讲解了Boto3客户端与资源对象的选择、Prefix参数的正确使用、以及如何处理S3对象列表的自动分页,并提供了实用的代码示例,帮助用户精确筛选和统计S3文件。
理解S3对象列表与Boto3
在AWS S3中,数据以对象(Object)的形式存储,每个对象都有一个唯一的键(Key),它包含了对象的完整路径和文件名。S3并没有传统文件系统中的“文件夹”概念,而是通过对象键的层级结构(例如folder1/folder2/file.txt)来模拟目录。使用Boto3库与S3交互时,选择正确的接口和理解参数至关重要。
Boto3 Client vs. Resource
Boto3提供了两种主要的接口来与AWS服务交互:client和resource。
- client (低级接口):提供与AWS API一对一的映射,功能更原始,需要手动处理分页、错误等。例如,s3.client(‘s3’).list_objects_v2()。
- resource (高级接口):在client之上封装了一层更高级、更Pythonic的抽象,通常更易于使用。它会自动处理分页,并提供更直观的对象模型。例如,s3.resource(‘s3’).Bucket(‘your-bucket’).objects.Filter()。
对于需要遍历大量S3对象并进行筛选的场景,推荐使用boto3.resource,因为它能自动处理S3 API的1000个对象限制(即分页),避免手动管理ContinuationToken。
Prefix参数的作用
在S3的list_objects_v2或objects.filter()方法中,Prefix参数用于指定一个字符串,只有对象键以此字符串开头的对象才会被返回。这是模拟“遍历文件夹”的关键。
立即学习“Python免费学习笔记(深入)”;
重要提示:Prefix参数应是S3桶内路径的一部分,不应包含S3 URL的协议头(s3://)或桶名。例如,如果S3 URL是s3://my-bucket/path/to/folder/,那么桶名是my-bucket,Prefix应该是path/to/folder/。如果Prefix被错误地设置为完整的S3 URL,将无法匹配到任何对象,导致计数为零。
实现S3文件统计逻辑
为了高效且准确地统计S3中特定文件(例如,名称以file_开头且以.ts结尾的视频分块文件),我们需要一个函数来执行以下步骤:
- 从给定的S3 URL中解析出桶名和前缀。
- 使用Boto3 Resource获取指定前缀下的所有对象。
- 遍历这些对象,并根据其键(object.key)进行精确的模式匹配。
1. 解析S3 URL为桶名和前缀
S3 URL通常格式为s3://
2. 构建文件统计函数
现在,我们可以创建一个函数来统计指定S3路径下符合条件的文件。这里我们将统计所有键以prefix开头,且包含file_并以.ts结尾的文件。
import boto3 def count_specific_files_in_s3(bucket_name, prefix, file_pattern_start='file_', file_pattern_end='.ts'): """ 统计S3桶中指定前缀下,符合特定命名模式的文件数量。 Args: bucket_name (str): S3桶的名称。 prefix (str): 要搜索的前缀路径(例如 'path/to/folder/')。 file_pattern_start (str): 文件名必须以此字符串开头(例如 'file_')。 file_pattern_end (str): 文件名必须以此字符串结尾(例如 '.ts')。 Returns: int: 符合条件的文件数量。 """ s3_resource = boto3.resource('s3') bucket = s3_resource.Bucket(bucket_name) count = 0 print(f"Checking folder: s3://{bucket_name}/{prefix}") try: # 使用filter(Prefix=prefix)获取指定路径下的所有对象 # boto3 resource会自动处理分页 for obj in bucket.objects.filter(Prefix=prefix): # object.key 是对象的完整键,例如 'path/to/folder/file_001.ts' # 确保我们只计算当前“目录”下的文件,而不是子目录中的文件 # 并且匹配特定的文件名模式 # 假设prefix是 'path/to/folder/' # 那么文件键应该是 'path/to/folder/file_XXX.ts' # 我们需要检查文件键是否以完整的prefix开头,并且在prefix之后的部分符合我们的文件名模式 # 获取对象键中,去除prefix后的相对路径/文件名 relative_key = obj.key[len(prefix):] if obj.key.startswith(prefix) else obj.key # 排除自身作为目录的情况 (例如 prefix是'foo/',obj.key是'foo/') # 排除子目录 (例如 prefix是'foo/',obj.key是'foo/bar/file.ts') # 只统计当前层级的文件 if '/' not in relative_key and relative_key.startswith(file_pattern_start) and relative_key.endswith(file_pattern_end): count += 1 # print(f"Matched file: {obj.key}") # 调试用 print(f"Actual chunks found: {count}") return count except Exception as e: print(f"Error Accessing S3 bucket {bucket_name} with prefix {prefix}: {e}") return 0 # 返回0表示发生错误或无法访问 # 示例用法 (假设你的AWS凭证已配置) # bucket_name = 'coursevideotesting' # prefix = 'Financial_Freedom_Course_Kannada/00_Course_Trailer_New_update/360p/' # actual_count = count_specific_files_in_s3(bucket_name, prefix) # print(f"Count for {prefix}: {actual_count}")
3. 整合CSV处理流程
现在,我们将上述功能集成到读取输入CSV、处理S3 URL并写入结果CSV的完整脚本中。
import csv import boto3 import re # 1. 解析S3 URL的辅助函数 def parse_s3_url(s3_url): match = re.match(r's3://([^/]+)/(.*)', s3_url) if match: bucket_name = match.group(1) prefix = match.group(2) # 确保前缀以斜杠结尾,如果它代表一个目录 if prefix and not prefix.endswith('/'): prefix += '/' return bucket_name, prefix else: raise ValueError(f"Invalid S3 URL format: {s3_url}") # 2. 统计S3中特定文件的函数 def count_specific_files_in_s3(bucket_name, prefix, file_pattern_start='file_', file_pattern_end='.ts'): s3_resource = boto3.resource('s3') bucket = s3_resource.Bucket(bucket_name) count = 0 print(f"Checking folder: s3://{bucket_name}/{prefix}") try: for obj in bucket.objects.filter(Prefix=prefix): # object.key 是完整的对象键,例如 'path/to/folder/file_001.ts' # 我们需要确保只统计当前层级的文件,而不是子目录中的文件 # 例如,如果prefix是'videos/qualityA/' # obj.key可能是 'videos/qualityA/file_001.ts' (要统计) # 或 'videos/qualityA/subfolder/file_001.ts' (不统计) # 提取对象键中,去除prefix后的相对路径/文件名 # 如果obj.key不以prefix开头,则跳过(理论上不会发生,因为filter已经处理了) if not obj.key.startswith(prefix): continue relative_key = obj.key[len(prefix):] # 检查relative_key是否包含斜杠,如果有,说明是子目录或子目录中的文件,跳过 # 并且检查文件名是否符合模式 if '/' not in relative_key and relative_key.startswith(file_pattern_start) and relative_key.endswith(file_pattern_end): count += 1 print(f"Actual chunks found: {count}") return count except Exception as e: print(f"Error accessing S3 bucket {bucket_name} with prefix {prefix}: {e}") return 0 # 发生错误时返回0 # --- 主脚本逻辑 --- # 输入和输出csv文件名称 input_csv_file = 'ldt_ffw_course_videos_temp.csv' # 替换为你的输入CSV文件 output_csv_file = 'file_count_result.csv' # 替换为你的输出CSV文件 # 确保输入CSV文件存在并包含正确的列名 # 假设输入CSV包含 'course_video_s3_url' 和 'course_video_ts_file_cnt' 列 # 读取URLs从输入CSV并检查文件数量 with open(input_csv_file, mode='r', encoding='utf-8') as infile, open(output_csv_file, mode='w', newline='', encoding='utf-8') as outfile: reader = csv.DictReader(infile) # 检查CSV列名是否存在 if 'course_video_s3_url' not in reader.fieldnames or 'course_video_ts_file_cnt' not in reader.fieldnames: raise ValueError(f"Input CSV must contain 'course_video_s3_url' and 'course_video_ts_file_cnt' columns. Found: {reader.fieldnames}") fieldnames = ['URL', 'Actual Files', 'Expected Files'] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() for row in reader: s3_url_from_csv = row['course_video_s3_url'] expected_files = int(row['course_video_ts_file_cnt']) try: # 解析S3 URL bucket_name, s3_prefix = parse_s3_url(s3_url_from_csv) # 调用函数统计文件 actual_files = count_specific_files_in_s3(bucket_name, s3_prefix) writer.writerow({ 'URL': s3_url_from_csv, 'Actual Files': actual_files, 'Expected Files': expected_files }) except ValueError as ve: print(f"Skipping row due to invalid S3 URL: {s3_url_from_csv} - {ve}") writer.writerow({ 'URL': s3_url_from_csv, 'Actual Files': 'Error: Invalid URL', 'Expected Files': expected_files }) except Exception as e: print(f"An unexpected error occurred for URL {s3_url_from_csv}: {e}") writer.writerow({ 'URL': s3_url_from_csv, 'Actual Files': 'Error: ' + str(e), 'Expected Files': expected_files }) print(f"File counting complete. Results written to {output_csv_file}")
注意事项与最佳实践
- AWS 凭证配置: 确保您的运行环境已正确配置AWS凭证(例如,通过环境变量、~/.aws/credentials文件或IAM角色)。Boto3会自动查找这些凭证。
- IAM 权限: 用于执行脚本的IAM用户或角色必须拥有对目标S3桶的s3:ListBucket权限,以便能够列出桶中的对象。如果需要读取对象内容,还需要s3:GetObject权限,但在此计数场景中不需要。
- Prefix 的精确性: Prefix参数越精确,S3返回的对象数量就越少,从而提高效率。如果您的S3路径结构允许,尽量使用最长的公共前缀。
- 文件命名模式: count_specific_files_in_s3函数中的file_pattern_start和file_pattern_end可以根据实际需求调整。对于更复杂的文件名匹配,可以考虑使用Python的re模块(正则表达式)。
- 错误处理: 在实际生产环境中,应加入更健壮的错误处理机制,例如重试逻辑、详细日志记录等。
- 性能考量: 尽管boto3.resource会自动处理分页,但对于包含数百万甚至数十亿个对象的超大型桶,遍历所有对象仍可能耗时。如果仅需统计特定前缀下的文件,确保Prefix设置得尽可能具体。
总结
通过本教程,您应该掌握了如何使用Python和Boto3库高效地统计AWS S3存储桶中特定类型的文件。关键在于理解boto3.resource的优势、正确使用Prefix参数来过滤S3对象,以及在Python代码中对object.key进行精确的字符串匹配。这种方法不仅能避免手动分页的复杂性,还能确保在处理大量S3对象时获得准确的统计结果。