在最高级别,单个 Pulsar 实例由一个或多个 Pulsar 集群组成。实例中的集群之间可以相互复制数据。
单个 Pulsar 集群由以下三部分组成:
- 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。
- 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储。
- A ZooKeeper cluster specific to that cluster handles coordination tasks between Pulsar clusters.
下图为一个 Pulsar 集群:
在更细粒度的实例级别, 有一个能访问到全部实例的ZooKeeper群集处理涉及多个pulsar集群的配置协调任务, 例如 异地复制。
Brokers
Pulsar的broker是一个无状态组件, 主要负责运行另外的两个组件:
- 一个 HTTP 服务器, 它暴露了 REST 系统管理接口以及在生产者和消费者之间进行 Topic查找的API。
- 一个调度分发器, 它是异步的TCP服务器,通过自定义 二进制协议应用于所有相关的数据传输。
出于性能的考虑, 通常从 managed ledger (ledger是Pulsar底层存储BookKeeper中的概念,相当于一种记录的集合) 缓存中调度消息, 除非 积压的消息超过这个缓存的大小。 如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。
最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并把这些条目用Java 客户端重新发布到其他区域
如何管理Pulsar Brokers, 请参考 brokers 指南
集群
一个Pulsar实例包含一个或者多个Pulsar集群。集群包括:
集群间可以通过异地复制进行消息同步
如何管理Pulsar集群,请参考clusters指南
元数据存储
Pulsar利用Apache Zookeeper进行元数据存储,集群配置和协调。在一个Pulsar实例中:
- 配置与仲裁存储: 存储租户,命名域和其他需要全局一致的配置项
- 每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的),等等
持久化存储
Pulsar提供有保证的应用消息运输。如果消息成功送达了Pulsar Broker,它一定会被送达至它的目的地
为了提供这种保证,未确认送达的消息需要持久化存储直到它们被确认送达 这种消息推送方式通常叫做持久化消息推送 在Pulsar内部,所有消息都被保存并同步N份,例如,2个服务器保存四份,每个服务器上面都有镜像的RAID存储
Apache BookKeeper
Pulsar用 Apache BookKeeper作为持久化存储。 BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:
- 能让Pulsar创建多个独立的日志,这种独立的日志就是ledgers. 随着时间的推移,Pulsar会为Topic创建多个ledgers。
- 为按条目复制的顺序数据提供了非常高效的存储。
- 保证了多系统挂掉时ledgers的读取一致性。
- 提供不同的Bookies之间均匀的IO分布的特性。
- 容量和吞吐量都能水平扩展。并且容量可以通过在集群内添加更多的Bookies立刻提升。
- Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。
除了消息数据,cursors也会被持久化入BookKeeper。 Cursors是消费端订阅消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。
目前,Pulsar支持持久消息存储。这可以通过topic名称中的persistent
字眼实现。这里有一个例子:
persistent://tenant/namespace/topic
Pulsar也支持临时消息( (non-persistent) )存储。
下图展示了brokers和bookies是如何交互的
Ledgers
Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。 Ledger的条目会被复制到多个bookies。 Ledgers本身有着非常简单的语义:
- Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。
- 当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,这个ledger只会以只读模式打开。
- 最后,当ledger中的条目不再有用的时候,整个legder可以被删除(ledger分布是跨Bookies的)。
Ledger读一致性
BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。
Managed ledgers
由于BookKeeper Ledgers提供了单一的日志抽象,在ledger的基础上我们开发了一个叫managed ledger的库,用以表示单个topic的存储层。 managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。
一个managed ledger在内部用多个BookKeeper ledgers保存数据,这么做有两个原因:
- 在故障之后,原有的某个ledger不能再写了,需要创建一个新的。
- ledger在所有cursors消费完它所保存的消息之后就可以被删除,这样可以实现ledgers的定期翻滚从头写。
日志存储
BookKeeper的日志文件包含事务日志。 在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 journalMaxSizeMB
参数配置)的时候,新的日志文件会被创建。
Pulsar proxy
Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers 。 然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址。 例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了。
Pulsar proxy提供了解决这个问题的方案,它可以作为集群中的所有brokers的统一网关。 如果你选择运行Pulsar Proxy(这是可选的),所有的客户端连接将会通过这个代理而不是直接与brokers通信。
为了性能和容错,你可以运行任意个Pulsar proxy。
架构上来看,Pulsar Proxy从ZooKeeper上面读取他所需要的所有信息。 当启动代理时,你只需要提供用于集群独有和实例范围的配置存储的ZooKeeper连接串。 Here’s an example:
$ bin/pulsar proxy \
--zookeeper-servers zk-0,zk-1,zk-2 \
--configuration-store-servers zk-0,zk-1,zk-2
Pulsar proxy 文档
如何使用Pulsar proxy,参考 Pulsar proxy 管理指南。
关于Pulsar proxy有一些比较重要的注意点:
- 连接客户端不需要为使用Pulsar proxy提供任何特定配置。 除了更新用于服务URL的IP之外,你不需要为现有的应用更新客户端配置(例如你在Pulsar proxy上层架设运行了负载均衡器)。
- Pulsar proxy支持TLS 加密 和 认证。
Service discovery
Clients connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar内部提供了服务发现的机制,你可以通过 配置Pulsar实例指南设置。
你也可以用你自己的服务发现系统。 如果你用你自己的系统,只需满足一个需求:当客户端发送一个HTTP请求,例如发到http://pulsar.us-west.example.com:8080
,客户端需要被重定向到某些所需的集群中活跃的broker,或者通过DNS,或者通过HTTP和IP重定向,或者其他机制。
下面这张图展示了Pulsar服务发现机制:
图中,Pulsar集群可以通过一个DNS名称寻址:pulsar-cluster.acme.com
。 例如Python客户端,可以像这样访问这个Pulsar集群:
from pulsar import Client
client = Client('pulsar://pulsar-cluster.acme.com:6650')