Pulsar中间件入门学习

pulsar 是一种高效的服务器到服务器消息系统,具有多租户和高性能等特点,最初由 yahoo 开发,现由 apache 软件基金会管理。它是 apache 的顶级项目,定位为下一代云原生分布式消息流平台,融合了消息传递、存储和轻量级函数计算功能,采用计算与存储分离的架构设计,支持多租户、持久化存储、跨区域数据复制,具备强一致性、高吞吐、低延迟和高扩展性等流数据存储特性,被视为云原生时代实时消息流传输、存储和计算的理想解决方案。

Pulsar 的特性包括:

  • 单实例支持多个集群,支持跨机房的消息复制。
  • 极低的发布和端到端延迟。
  • 可扩展到超过一百万个 topic。
  • 提供简单易用的客户端 API,支持 Java、Go、pythonc++
  • 支持多种 topic 订阅模式,包括独占订阅、共享订阅、故障转移订阅和键共享订阅。
  • 通过 Apache BookKeeper 提供持久化消息存储,确保消息传递的可靠性。
  • Pulsar Functions 和 Pulsar IO 提供 serverless 计算框架和数据集成解决方案。
  • 支持分层存储,可将旧数据从热存储卸载到冷/长期存储(如 S3、GCS)。

Pulsar 的架构主要包括以下组件:

  • Broker:负责消息传输、Topic 管理和负载均衡,不存储消息,是无状态组件。
  • Bookie:使用 Apache BookKeeper 组件,负责消息的持久化存储
  • Producer:生产者,封装并发送消息到 Broker。
  • Consumer:消费者,通过订阅 Topic 消费消息并确认。Pulsar 还定义了 Reader 角色,允许从指定位置获取消息,无需确认。
  • zookeeper:用于元数据存储和集群配置管理,包括租户和命名空间的一致性协调。

Pulsar 支持四种订阅模式:

  • 独占(Exclusive)订阅:同一时间只有一个消费者可以消费数据,适用范围较小。
  • 共享(Shared)订阅:多个消费者可以同时运行,消息按轮询方式分配,但无法保证消息顺序。
  • 故障转移(Failover)订阅:在独占模式的基础上,允许启动多个消费者,当一个消费者失败时,其他消费者可以接管。
  • 键共享(KeyShared)订阅:基于共享模式,消息按键分组,同组消息由同一个消费者有序消费。

下载和安装 Pulsar 2.9.1 版本后,可以在 linux 服务器上解压并启动单机版 Pulsar。使用命令行可以启动和终止 Pulsar 服务。

spring Boot 中集成 Pulsar 需要以下步骤:

  1. 引入 maven 依赖

    <dependency>     <groupId>io.github.majusko</groupId>     <artifactId>pulsar-java-spring-boot-starter</artifactId>     <version>1.1.0</version> </dependency>
  2. 配置 application.yml

    pulsar:   service-url: pulsar://192.168.0.105:6650
  3. 创建 Pulsar 配置类

    @Configuration public class PulsarConfig {     @Bean     public ProducerFactory producerFactory() {         return new ProducerFactory().addProducer("testTopic", String.class);     } }
  4. 定义 Topic 名称常量

    public class TopicName {     private TopicName(){}     public static final String TEST_TOPIC = "testTopic"; }
  5. 创建消息生产者类

    @Component public class PulsarProducer<T> {     @Resource     private PulsarTemplate<T> template;     public void send(String topic, T message) {         try {             template.send(topic, message);         } catch (PulsarClientException e) {             e.printStackTrace();         }     } }
  6. 创建消息消费者类

    @Component public class TestTopicPulsarConsumer {     private static final Logger log = LoggerFactory.getLogger(TestTopicPulsarConsumer.class);     @PulsarConsumer(topic = TopicName.TEST_TOPIC, subscriptionType = SubscriptionType.Shared, clazz = String.class)     public void consume(String message) {         log.info("PulsarRealConsumer content:{}", message);     } }
  7. 创建 PulsarController 测试发送消息

    @RestController @RequestMapping("/pulsar") public class PulsarController {     @Resource     private PulsarProducer<String> pulsarProducer;     @PostMapping(value = "/sendMessage")     public CommonResponse<String> sendMessage(@RequestParam(name = "message") String message) {         pulsarProducer.send(TopicName.TEST_TOPIC, message);         return CommonResponse.success("done");     } }
  8. 定义公共响应体类

    public class CommonResponse<T> {     private String code;     private Boolean success;     private T data;     public static <T> CommonResponse<T> success(T t){         return new CommonResponse<>("200",true,t);     }     public CommonResponse(String code, Boolean success, T data) {         this.code = code;         this.success = success;         this.data = data;     }     //getter、setter方法 }

启动项目后,可以使用 postman 测试消息发送和接收功能。

Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习

© 版权声明
THE END
喜欢就支持一下吧
点赞6 分享