并行处理视频流:使用 PySpark 进行大规模视频分析

并行处理视频流:使用 PySpark 进行大规模视频分析

本文档介绍了如何使用 Pyspark 并行处理多个视频文件,并进行人脸识别等视频分析任务。我们将探讨如何利用 Spark 的分布式计算能力,高效地从视频中提取帧,检测人脸,并进行人脸追踪。本文提供了详细的代码示例和步骤,帮助读者理解和应用 PySpark 进行大规模视频处理。

环境配置

首先,确保你的环境中安装了必要的 python 库和 ffmpeg

  1. 安装 Python 库:

    pip install ffmpeg-python pip install face-recognition conda install -c conda-forge opencv
  2. 安装 FFmpeg:

    确保你的系统上安装了 FFmpeg。具体的安装方法取决于你的操作系统。例如,在 ubuntu 上可以使用以下命令:

    sudo apt-get update sudo apt-get install ffmpeg

PySpark 代码实现

以下代码展示了如何使用 PySpark 并行读取视频文件,提取帧,并进行人脸检测和追踪。

from pyspark import SQLContext, SparkConf, SparkContext from pyspark.sql import SparkSession import pyspark.sql.functions as F  # 配置 Spark conf = SparkConf().setAppName("myApp").setMaster("local[40]") spark = SparkSession.builder.master("local[40]").config("spark.driver.memory", "30g").getOrCreate()  sc = spark.sparkContext sqlContext = SQLContext(sc)  import cv2 import os import uuid import ffmpeg import subprocess import numpy as np  from scipy.optimize import linear_sum_assignment import pyspark.sql.functions as F from pyspark.sql import Row from pyspark.sql.types import (StructType, StructField,                                IntegerType, FloatType,                                ArrayType, BinaryType,                                MapType, DoubleType, StringType)  from pyspark.sql.window import Window from pyspark.ml.feature import StringIndexer from pyspark.sql import Row, DataFrame, SparkSession  import pathlib  # 指定视频文件目录 input_dir = "../data/video_files/faces/" pathlist = list(pathlib.Path(input_dir).glob('*.mp4')) pathlist = [Row(str(ele)) for ele in pathlist]  # 创建 DataFrame column_name = ["video_uri"] df = sqlContext.createDataFrame(data=pathlist, schema=column_name)  print("Initial dataframe") df.show(10, truncate=False)  # 定义视频元数据 Schema video_metadata = StructType([     StructField("width", IntegerType(), False),     StructField("height", IntegerType(), False),     StructField("num_frames", IntegerType(), False),     StructField("duration", FloatType(), False) ])  # 定义 Shot Schema shots_schema = ArrayType(     StructType([         StructField("start", FloatType(), False),         StructField("end", FloatType(), False)     ]))  # 定义 UDF:提取视频元数据 @F.udf(returnType=video_metadata) def video_probe(uri):     probe = ffmpeg.probe(uri, threads=1)     video_stream = next(         (             stream             for stream in probe["streams"]             if stream["codec_type"] == "video"         ),         None,     )     width = int(video_stream["width"])     height = int(video_stream["height"])     num_frames = int(video_stream["nb_frames"])     duration = float(video_stream["duration"])     return (width, height, num_frames, duration)  # 定义 UDF:提取视频帧 @F.udf(returnType=ArrayType(BinaryType())) def video2images(uri, width, height,                  sample_rate: int = 5,                  start: float = 0.0,                  end: float = -1.0,                  n_channels: int = 3):     """     Uses FFmpeg filters to extract image byte arrays     and sampled & localized to a segment of video in time.     """     video_data, _ = (         ffmpeg.input(uri, threads=1)         .output(             "pipe:",             format="rawvideo",             pix_fmt="rgb24",             ss=start,             t=end - start,             r=1 / sample_rate,         ).run(capture_stdout=True))     img_size = height * width * n_channels     return [video_data[idx:idx + img_size] for idx in range(0, len(video_data), img_size)]  # 添加视频元数据列 df = df.withColumn("metadata", video_probe(F.col("video_uri"))) print("With Metadata") df.show(10, truncate=False)  # 提取视频帧 df = df.withColumn("frame", F.explode(     video2images(F.col("video_uri"), F.col("metadata.width"), F.col("metadata.height"), F.lit(1), F.lit(0.0),                  F.lit(5.0))))  # 定义人脸检测相关 Schema 和 UDF box_struct = StructType(     [         StructField("xmin", IntegerType(), False),         StructField("ymin", IntegerType(), False),         StructField("xmax", IntegerType(), False),         StructField("ymax", IntegerType(), False)     ] )  def bbox_helper(bbox):     top, right, bottom, left = bbox     bbox = [top, left, bottom, right]      return list(map(lambda x: max(x, 0), bbox))  @F.udf(returnType=ArrayType(box_struct)) def face_detector(img_data, width=1920, height=1080, n_channels=3):     img = np.frombuffer(img_data, np.uint8).reshape(height, width, n_channels)     faces = face_recognition.face_locations(img)     return [bbox_helper(f) for f in faces]  # 进行人脸检测 df = df.withColumn("faces", face_detector(F.col("frame"), F.col("metadata.width"), F.col("metadata.height")))  # 定义人脸追踪相关 Schema 和 UDF annot_schema = ArrayType(     StructType(         [             StructField("bbox", box_struct, False),             StructField("tracker_id", StringType(), False),         ]     ) )  def bbox_iou(b1, b2):     L = list(zip(b1, b2))     left, top = np.max(L, axis=1)[:2]     right, bottom = np.min(L, axis=1)[2:]     if right < left or bottom < top:         return 0     b_area = lambda b: (b[2] - b[0]) * (b[3] - b[1])     inter_area = b_area([left, top, right, bottom])     b1_area, b2_area = b_area(b1), b_area(b2)     iou = inter_area / float(b1_area + b2_area - inter_area)     return iou  @F.udf(returnType=MapType(IntegerType(), IntegerType())) def tracker_match(trackers, detections, bbox_col="bbox", threshold=0.3):     """     Match Bounding Boxes across successive image frames.     """     from scipy.optimize import linear_sum_assignment      similarity = bbox_iou     if not trackers or not detections:         return {}     if len(trackers) == len(detections) == 1:         if (                 similarity(trackers[0][bbox_col], detections[0][bbox_col])                 >= threshold         ):             return {0: 0}      sim_mat = np.array(         [             [                 similarity(tracked[bbox_col], detection[bbox_col])                 for tracked in trackers             ]             for detection in detections         ],         dtype=np.float32,     )      matched_idx = linear_sum_assignment(-sim_mat)     matches = []     for m in matched_idx:         try:             if sim_mat[m[0], m[1]] >= threshold:                 matches.append(m.reshape(1, 2))         except:             pass      if len(matches) == 0:         return {}     else:         matches = np.concatenate(matches, axis=0, dtype=int)      rows, cols = zip(*np.where(matches))     idx_map = {cols[idx]: rows[idx] for idx in range(len(rows))}     return idx_map  @F.udf(returnType=ArrayType(box_struct)) def OFMotionModel(frame, prev_frame, bboxes, height, width):     if not prev_frame:         prev_frame = frame     gray = cv2.cvtColor(np.frombuffer(frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)     prev_gray = cv2.cvtColor(np.frombuffer(prev_frame, np.uint8).reshape(height, width, 3), cv2.COLOR_BGR2GRAY)      inst = cv2.DISOpticalFlow.create(cv2.DISOPTICAL_FLOW_PRESET_MEDIUM)     inst.setUseSpatialPropagation(False)      flow = inst.calc(prev_gray, gray, None)      h, w = flow.shape[:2]     shifted_boxes = []     for box in bboxes:         xmin, ymin, xmax, ymax = box         avg_y = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 0])         avg_x = np.mean(flow[int(ymin):int(ymax), int(xmin):int(xmax), 1])          shifted_boxes.append(             {"xmin": int(max(0, xmin + avg_x)), "ymin": int(max(0, ymin + avg_y)), "xmax": int(min(w, xmax + avg_x)),              "ymax": int(min(h, ymax + avg_y))})     return shifted_boxes  def match_annotations(iterator, segment_id="video_uri", id_col="tracker_id"):     """     Used by mapPartitions to iterate over the small chunks of our hierarchically-organized data.     """      matched_annots = []     for idx, data in enumerate(iterator):         data = data[1]         if not idx:             old_row = {idx: uuid.uuid4() for idx in range(len(data[1]))}             old_row[segment_id] = data[0]             pass         annots = []         curr_row = {segment_id: data[0]}         if old_row[segment_id] != curr_row[segment_id]:             old_row = {}         if data[2] is not None:             for ky, vl in data[2].items():                 detection = data[1][vl].asDict()                 detection[id_col] = old_row.get(ky, uuid.uuid4())                 curr_row[vl] = detection[id_col]                 annots.append(Row(**detection))         matched_annots.append(annots)         old_row = curr_row     return matched_annots  def track_detections(df, segment_id="video_uri", frames="frame", detections="faces", optical_flow=True):     id_col = "tracker_id"     frame_window = Window().orderBy(frames)     value_window = Window().orderBy("value")     annot_window = Window.partitionBy(segment_id).orderBy(segment_id, frames)     indexer = StringIndexer(inputCol=segment_id, outputCol="vidIndex")      # adjust detections w/ optical flow     if optical_flow:         df = (             df.withColumn("prev_frames", F.lag(F.col(frames)).over(annot_window))             .withColumn(detections, OFMotionModel(F.col(frames), F.col("prev_frames"), F.col(detections), F.col("metadata.height"), F.col("metadata.width")))         )      df = (         df.select(segment_id, frames, detections)         .withColumn("bbox", F.explode(detections))         .withColumn(id_col, F.lit(""))         .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))         .groupBy(segment_id, frames, detections)         .agg(F.collect_list("trackables").alias("trackables"))         .withColumn(             "old_trackables", F.lag(F.col("trackables")).over(annot_window)         )         .withColumn(             "matched",             tracker_match(F.col("trackables"), F.col("old_trackables")),         )         .withColumn("frame_index", F.row_number().over(frame_window))     )      df = (         indexer.fit(df)         .transform(df)         .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))     )     unique_ids = df.select("vidIndex").distinct().count()     matched = (         df.select("vidIndex", segment_id, "trackables", "matched")         .rdd.map(lambda x: (x[0], x[1:]))         .partitionBy(unique_ids, lambda x: int(x[0]))         .mapPartitions(match_annotations)     )     matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",                                                                                        F.row_number().over(                                                                                            value_window))      return (         df.join(matched_annotations, F.col("value_index") == F.col("frame_index"))         .withColumnRenamed("value", "trackers_matched")         .withColumn("tracked", F.explode(F.col("trackers_matched")))         .select(             segment_id,             frames,             detections,             F.col("tracked.{}".format("bbox")).alias("bbox"),             F.col("tracked.{}".format(id_col)).alias(id_col),         )         .withColumn(id_col, F.sha2(F.concat(F.col(segment_id), F.col(id_col)), 256))         .withColumn("tracked_detections", F.struct([F.col("bbox"), F.col(id_col)]))         .groupBy(segment_id, frames, detections)         .agg(F.collect_list("tracked_detections").alias("tracked_detections"))         .orderBy(segment_id, frames, detections)     )  # 定义 DetectionTracker transformer from pyspark import keyword_only from pyspark.ml.pipeline import Transformer from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param  class DetectionTracker(Transformer, HasInputCol, HasOutputCol):     """Detect and track."""      @keyword_only     def __init__(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):         """Initialize."""         super(DetectionTracker, self).__init__()         self.framesCol = Param(self, "framesCol", "Column containing frames.")         self.detectionsCol = Param(self, "detectionsCol", "Column containing detections.")         self.optical_flow = Param(self, "optical_flow", "Use optical flow for tracker correction. Default is False")         self._setDefault(framesCol="frame", detectionsCol="faces", optical_flow=False)         kwargs = self._input_kwargs         self.setParams(**kwargs)      @keyword_only     def setParams(self, inputCol=None, outputCol=None, framesCol=None, detectionsCol=None, optical_flow=None):         """Get params."""         kwargs = self._input_kwargs         return self._set(**kwargs)      def setFramesCol(self, value):         """Set framesCol."""         return self._set(framesCol=value)      def getFramesCol(self):         """Get framesCol."""         return self.getOrDefault(self.framesCol)      def setDetectionsCol(self, value):         """Set detectionsCol."""         return self._set(detectionsCol=value)      def getDetectionsCol(self):         """Get detectionsCol."""         return self.getOrDefault(self.detectionsCol)      def setOpticalflow(self, value):         """Set optical_flow."""         return self._set(optical_flow=value)      def getOpticalflow(self):         """Get optical_flow."""         return self.getOrDefault(self.optical_flow)      def _transform(self, dataframe):         """Do transformation."""         input_col = self.getInputCol()         output_col = self.getOutputCol()         frames_col = self.getFramesCol()         detections_col = self.getDetectionsCol()         optical_flow = self.getOpticalflow()          id_col = "tracker_id"         frame_window = Window().orderBy(frames_col)         value_window = Window().orderBy("value")         annot_window = Window.partitionBy(input_col).orderBy(input_col, frames_col)         indexer = StringIndexer(inputCol=input_col, outputCol="vidIndex")          # adjust detections w/ optical flow         if optical_flow:             dataframe = (                 dataframe.withColumn("prev_frames", F.lag(F.col(frames_col)).over(annot_window))                 .withColumn(detections_col,                             OFMotionModel(F.col(frames_col), F.col("prev_frames"), F.col(detections_col)))             )          dataframe = (             dataframe.select(input_col, frames_col, detections_col)             .withColumn("bbox", F.explode(detections_col))             .withColumn(id_col, F.lit(""))             .withColumn("trackables", F.struct([F.col("bbox"), F.col(id_col)]))             .groupBy(input_col, frames_col, detections_col)             .agg(F.collect_list("trackables").alias("trackables"))             .withColumn(                 "old_trackables", F.lag(F.col("trackables")).over(annot_window)             )             .withColumn(                 "matched",                 tracker_match(F.col("trackables"), F.col("old_trackables")),             )             .withColumn("frame_index", F.row_number().over(frame_window))         )          dataframe = (             indexer.fit(dataframe)             .transform(dataframe)             .withColumn("vidIndex", F.col("vidIndex").cast(StringType()))         )          unique_ids = dataframe.select("vidIndex").distinct().count()         matched = (             dataframe.select("vidIndex", input_col, "trackables", "matched")             .rdd.map(lambda x: (x[0], x[1:]))             .partitionBy(unique_ids, lambda x: int(x[0]))             .mapPartitions(match_annotations)         )          matched_annotations = sqlContext.createDataFrame(matched, annot_schema).withColumn("value_index",                                                                                            F.row_number().over(                                                                                                value_window))          return (             dataframe.join(matched_annotations, F.col("value_index") == F.col("frame_index"))             .withColumnRenamed("value", "trackers_matched")             .withColumn("tracked", F.explode(F.col("trackers_matched")))             .select(                 input_col,                 frames_col,                 detections_col,                 F.col("tracked.{}".format("bbox")).alias("bbox"),                 F.col("tracked.{}".format(id_col)).alias(id_col),             )             .withColumn(id_col, F.sha2(F.concat(F.col(input_col), F.col(id_col)), 256))             .withColumn(output_col, F.struct([F.col("bbox"), F.col(id_col)]))             .groupBy(input_col, frames_col, detections_col)             .agg(F.collect_list(output_col).alias(output_col))             .orderBy(input_col, frames_col, detections_col)         )  # 使用 DetectionTracker detectTracker = DetectionTracker(inputCol="video_uri", outputCol="tracked_detections") print(type(detectTracker))  detectTracker.transform(df) final = track_detections(df)  print("Final dataframe") final.select("tracked_detections").show(100, truncate=False)

代码解释

  1. Spark 配置:

    • 配置 SparkSession,设置 App 名称和 Master 节点。
    • 增加 spark.driver.memory 配置,避免内存不足。
  2. 视频元数据提取:

    • 使用 FFmpeg 提取视频的宽度、高度、帧数和时长。
    • video_probe UDF 用于获取视频元数据。
  3. 视频帧提取:

    • 使用 FFmpeg 提取视频帧,并将其转换为图像字节数组。
    • video2images UDF 用于提取视频帧。
  4. 人脸检测:

    • 使用 face_recognition 库检测视频帧中的人脸。
    • face_detector UDF 用于检测人脸,并返回人脸的边界框。
  5. 人脸追踪:

    • 使用光流法(Optical Flow)和匈牙利算法(Hungarian Algorithm)进行人脸追踪。
    • OFMotionModel UDF 用于使用光流法预测下一帧的人脸位置。
    • tracker_match UDF 用于匹配相邻帧中的人脸。
    • match_annotations 函数用于在分区中匹配人脸。
    • track_detections 函数用于整合人脸检测和追踪结果。
  6. DetectionTracker Transformer:

    • 将人脸检测和追踪逻辑封装成一个 Transformer,方便在 Spark ML Pipeline 中使用。

注意事项

  • 内存配置: 视频处理通常需要大量的内存。请根据实际情况调整 spark.driver.memory 和 spark.executor.memory 参数。
  • FFmpeg 安装: 确保 FFmpeg 安装正确,并且可以在系统中访问。
  • 视频文件路径: 确保视频文件路径正确,并且 Spark 可以访问这些文件。
  • UDF 性能: UDF 的性能可能不如 Spark 内置函数。在实际应用中,尽量使用 Spark 内置函数优化性能。
  • 并行度: 根据集群的规模和视频文件的大小,调整 Spark 的并行度,以充分利用集群的计算资源。

总结

本文档提供了一个使用 PySpark 并行处理视频文件的完整示例,包括视频元数据提取、视频帧提取、人脸检测和追踪。通过使用 Spark 的分布式计算能力,我们可以高效地处理大规模的视频数据,并进行各种视频分析任务。在实际应用中,可以根据具体需求调整代码,例如修改人脸检测算法、调整光流法参数等。

© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享