pulsar 是一种高效的服务器到服务器消息系统,具有多租户和高性能等特点,最初由 yahoo 开发,现由 apache 软件基金会管理。它是 apache 的顶级项目,定位为下一代云原生分布式消息流平台,融合了消息传递、存储和轻量级函数计算功能,采用计算与存储分离的架构设计,支持多租户、持久化存储、跨区域数据复制,具备强一致性、高吞吐、低延迟和高扩展性等流数据存储特性,被视为云原生时代实时消息流传输、存储和计算的理想解决方案。
Pulsar 的特性包括:
- 单实例支持多个集群,支持跨机房的消息复制。
- 极低的发布和端到端延迟。
- 可扩展到超过一百万个 topic。
- 提供简单易用的客户端 API,支持 Java、Go、python 和 c++。
- 支持多种 topic 订阅模式,包括独占订阅、共享订阅、故障转移订阅和键共享订阅。
- 通过 Apache BookKeeper 提供持久化消息存储,确保消息传递的可靠性。
- Pulsar Functions 和 Pulsar IO 提供 serverless 计算框架和数据集成解决方案。
- 支持分层存储,可将旧数据从热存储卸载到冷/长期存储(如 S3、GCS)。
Pulsar 的架构主要包括以下组件:
- Broker:负责消息传输、Topic 管理和负载均衡,不存储消息,是无状态组件。
- Bookie:使用 Apache BookKeeper 组件,负责消息的持久化存储。
- Producer:生产者,封装并发送消息到 Broker。
- Consumer:消费者,通过订阅 Topic 消费消息并确认。Pulsar 还定义了 Reader 角色,允许从指定位置获取消息,无需确认。
- zookeeper:用于元数据存储和集群配置管理,包括租户和命名空间的一致性协调。
Pulsar 支持四种订阅模式:
- 独占(Exclusive)订阅:同一时间只有一个消费者可以消费数据,适用范围较小。
- 共享(Shared)订阅:多个消费者可以同时运行,消息按轮询方式分配,但无法保证消息顺序。
- 故障转移(Failover)订阅:在独占模式的基础上,允许启动多个消费者,当一个消费者失败时,其他消费者可以接管。
- 键共享(KeyShared)订阅:基于共享模式,消息按键分组,同组消息由同一个消费者有序消费。
下载和安装 Pulsar 2.9.1 版本后,可以在 linux 服务器上解压并启动单机版 Pulsar。使用命令行可以启动和终止 Pulsar 服务。
在 spring Boot 中集成 Pulsar 需要以下步骤:
-
引入 maven 依赖:
<dependency> <groupId>io.github.majusko</groupId> <artifactId>pulsar-java-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency>
-
配置 application.yml:
pulsar: service-url: pulsar://192.168.0.105:6650
-
创建 Pulsar 配置类:
@Configuration public class PulsarConfig { @Bean public ProducerFactory producerFactory() { return new ProducerFactory().addProducer("testTopic", String.class); } }
-
定义 Topic 名称常量类:
public class TopicName { private TopicName(){} public static final String TEST_TOPIC = "testTopic"; }
-
创建消息生产者类:
@Component public class PulsarProducer<T> { @Resource private PulsarTemplate<T> template; public void send(String topic, T message) { try { template.send(topic, message); } catch (PulsarClientException e) { e.printStackTrace(); } } }
-
创建消息消费者类:
@Component public class TestTopicPulsarConsumer { private static final Logger log = LoggerFactory.getLogger(TestTopicPulsarConsumer.class); @PulsarConsumer(topic = TopicName.TEST_TOPIC, subscriptionType = SubscriptionType.Shared, clazz = String.class) public void consume(String message) { log.info("PulsarRealConsumer content:{}", message); } }
-
创建 PulsarController 测试发送消息:
@RestController @RequestMapping("/pulsar") public class PulsarController { @Resource private PulsarProducer<String> pulsarProducer; @PostMapping(value = "/sendMessage") public CommonResponse<String> sendMessage(@RequestParam(name = "message") String message) { pulsarProducer.send(TopicName.TEST_TOPIC, message); return CommonResponse.success("done"); } }
-
定义公共响应体类:
public class CommonResponse<T> { private String code; private Boolean success; private T data; public static <T> CommonResponse<T> success(T t){ return new CommonResponse<>("200",true,t); } public CommonResponse(String code, Boolean success, T data) { this.code = code; this.success = success; this.data = data; } //getter、setter方法 }
启动项目后,可以使用 postman 测试消息发送和接收功能。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END