在处理一个需要与 Confluent 平台集成的大型项目时,我遇到了一个棘手的问题:如何确保在 Avro 序列化格式下,模式演变不会影响下游消费者。Confluent 开发的 Schema Registry 能够通过验证模式演变来解决这个问题,但遗憾的是,Confluent 并没有为 php 提供官方的 Avro SerDe 包。经过一番研究,我找到了 flix-tech/avro-serde-php 这个库,它不仅实现了 Confluent 的线格式,还集成了 FlixTech 的 Schema Registry 客户端,彻底解决了我的问题。
使用 composer 安装 flix-tech/avro-serde-php 非常简单,只需运行以下命令:
composer require 'flix-tech/avro-serde-php:^2.2'
接下来,我将介绍如何使用这个库来解决 Avro 序列化和反序列化的问题。
首先,需要创建一个缓存的 Schema Registry 客户端,以避免每次序列化或反序列化消息时都进行 http 请求:
use FlixTechSchemaRegistryApiRegistryCacheAvroObjectCacheAdapter; use FlixTechSchemaRegistryApiRegistryCachedRegistry; use FlixTechSchemaRegistryApiRegistryPromisingRegistry; use GuzzleHttpClient; $schemaRegistryClient = new CachedRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ), new AvroObjectCacheAdapter() );
然后,构建 RecordSerializer 实例,这是与库交互的主要方式,它提供了 encodeRecord 和 decodeMessage 方法来执行序列化和反序列化操作:
立即学习“PHP免费学习笔记(深入)”;
use FlixTechAvroSerializerObjectsRecordSerializer; $recordSerializer = new RecordSerializer( $schemaRegistry, [ RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false, RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false, ] );
使用 RecordSerializer 可以轻松地编码和解码消息。例如,编码记录:
$subject = 'my-topic-value'; $avroSchema = AvroSchema::parse('{"type": "string"}'); $record = 'Test message'; $encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record); // 发送到网络...
解码消息:
$record = $recordSerializer->decodeMessage($encodedBinaryAvro); echo $record; // 'Test message'
此外,flix-tech/avro-serde-php 还提供了多种 Schema Resolver 来管理 Avro 模式。例如,FileResolver 可以从文件中加载模式,CallableResolver 可以使用自定义函数来获取模式,而 DefinitionInterfaceResolver 则通过实现 HasSchemaDefinitionInterface 接口来解析模式。
如果你使用 symfony Serializer 组件,这个库还提供了相应的集成:
use FlixTechAvroSerializerIntegrationsSymfonySerializerAvroSerDeEncoder; use FlixTechAvroSerializerObjectsDefaultRecordSerializerFactory; use SymfonyComponentSerializerNormalizerGetSetMethodNormalizer; use SymfonyComponentSerializerSerializer; $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); $normalizer = new GetSetMethodNormalizer(); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]);
最后,这个库还提供了 Schema Builder 和 Schema Generator 功能,允许你使用 PHP 代码定义和生成 Avro 模式。
使用 flix-tech/avro-serde-php 库解决了我的 Avro 序列化和反序列化问题,使得我的项目能够顺利与 Confluent 平台集成。它的灵活性和强大功能大大提高了开发效率,确保了模式演变的兼容性。我强烈推荐任何需要在 PHP 中处理 Avro 序列化的开发者使用这个库。