
本文深入探讨了如何通过扩展laravel websockets的默认处理器(handler),实现对客户端连接生命周期事件(如连接建立与断开)的精细化控制。我们将重点关注如何在这些事件中获取应用层上下文信息,例如用户id或关联的业务资源id,进而实现实时资源状态管理,如在用户打开订单时锁定订单,并在连接关闭时自动解锁,从而提升应用的用户体验和数据一致性。
在构建实时Web应用时,我们经常需要根据客户端的连接状态来管理服务器端的资源。例如,当用户在一个浏览器标签页中打开某个订单详情进行编辑时,我们可能希望暂时“锁定”该订单,防止其他用户同时修改,并在用户关闭该标签页或断开连接时自动“解锁”。laravel websockets包提供了一个强大的基础,但要实现这种复杂的业务逻辑,我们需要深入定制其连接处理器。
理解Laravel Websockets处理器与生命周期事件
Laravel Websockets基于Ratchet库,其核心是WebSocketHandler。这个处理器定义了处理WebSocket连接生命周期事件的方法:
- onOpen(ConnectionInterface $conn):当一个新的WebSocket连接建立时触发。
- onMessage(ConnectionInterface $from, $msg):当连接接收到客户端发送的消息时触发。
- onClose(ConnectionInterface $conn):当WebSocket连接关闭时触发。
- onError(ConnectionInterface $conn, Exception $e):当连接发生错误时触发。
默认的WebSocketHandler已经处理了Pusher协议的订阅、取消订阅等基础逻辑。然而,在onOpen和onClose方法中,我们直接获取到的只有ConnectionInterface对象,它包含了连接的基本信息(如资源ID),但通常不包含我们需要的应用层上下文,如当前登录的用户ID或用户正在操作的特定订单ID。
挑战:获取应用层上下文信息
要实现订单锁定/解锁的场景,我们需要在连接建立或关闭时知道是“哪个用户”的“哪个连接”与“哪个订单”相关联。直接从ConnectionInterface获取这些信息是困难的。
解决此问题的关键在于:在客户端订阅特定频道时,捕获并关联这些上下文信息。
当客户端(例如通过Laravel echo)订阅一个私有频道时,例如private-order.{order_id},这个频道名称本身就包含了我们需要的order_id。同时,私有频道订阅需要经过认证,这意味着在订阅成功时,我们也能确定是哪个用户发起的。
定制WebSocket处理器
为了实现我们的目标,我们需要创建一个自定义的WebSocket处理器,继承自BeyondCodeLaravelWebSocketsWebSocketsWebSocketHandler。
1. 创建自定义处理器
首先,在app/Websockets目录下(如果不存在,请创建)创建一个新的处理器类,例如CustomWebSocketHandler.php:
// app/Websockets/CustomWebSocketHandler.php namespace AppWebsockets; use BeyondCodeLaravelWebSocketsWebSocketsChannelsChannelManager; use BeyondCodeLaravelWebSocketsWebSocketsWebSocketHandler; use RatchetConnectionInterface; use SplObjectStorage; // 用于存储连接相关数据,适用于单服务器部署 use IlluminateSupportFacadesLog; // use IlluminateSupportFacadesredis; // 对于多服务器部署,推荐使用redis class CustomWebSocketHandler extends WebSocketHandler { /** * 存储连接到其订阅的频道列表的映射。 * @var SplObjectStorage<ConnectionInterface, array<string>> */ protected SplObjectStorage $connectionChannels; /** * 存储连接到其应用层上下文的映射 (例如: user_id, 关联的资源ID等)。 * @var SplObjectStorage<ConnectionInterface, array<string, mixed>> */ protected SplObjectStorage $connectionContext; public function __construct(ChannelManager $channelManager) { parent::__construct($channelManager); $this->connectionChannels = new SplObjectStorage(); $this->connectionContext = new SplObjectStorage(); } /** * 当新的WebSocket连接建立时触发。 */ public function onOpen(ConnectionInterface $connection) { parent::onOpen($connection); Log::info("Connection opened: {$connection->resourceId}"); // 初始化该连接的频道和上下文存储 $this->connectionChannels->attach($connection, []); $this->connectionContext->attach($connection, []); // 在此阶段,通常还没有明确的应用层上下文(如用户ID或订单ID)。 // 这些信息通常在订阅私有频道后才能获取。 } /** * 当连接接收到客户端发送的消息时触发。 * 我们在此拦截 'pusher:subscribe' 消息以获取频道信息。 */ public function onMessage(ConnectionInterface $connection, $msg) { $message = json_decode($msg, true); // 检查是否是订阅频道的消息 if (isset($message['event']) && $message['event'] === 'pusher:subscribe') { $channelName = $message['data']['channel'] ?? null; if ($channelName) { // 将频道添加到该连接的订阅列表中 if (!$this->connectionChannels->contains($connection)) { $this->connectionChannels->attach($connection, []); } $channels = $this->connectionChannels[$connection]; if (!in_array($channelName, $channels)) { $channels[] = $channelName; $this->connectionChannels[$connection] = $channels; Log::info("Connection {$connection->resourceId} subscribed to channel: {$channelName}"); // 针对私有频道,Laravel Websockets会在认证成功后将用户对象存储在 ConnectionInterface->app->user 中 $user = $connection->app->user ?? null; if ($user) { $context = $this->connectionContext[$connection]; $context['user_id'] = $user->id; $this->connectionContext[$connection] = $context; Log::info("Connection {$connection->resourceId} associated with user ID: {$user->id}"); } // 如果是特定资源频道 (例如 'private-order.{order_id}') if (str_starts_with($channelName, 'private-order.')) { $orderId = (int) substr($channelName, strlen('private-order.')); $this->lockOrder($orderId, $connection->resourceId, $user->id ?? null); // 也可以将订单ID存储到连接上下文中 $context = $this->connectionContext[$connection]; if (!isset($context['order_ids'])) { $context['order_ids'] = []; } $context['order_ids'][] = $orderId; $this->connectionContext[$connection] = $context; } } } } // 务必调用父类的 onMessage 方法,以确保标准的WebSocket操作(如实际的频道订阅处理)得到执行 parent::onMessage($connection, $msg); } /** * 当WebSocket连接关闭时触发。 * 我们在此执行资源解锁和清理工作。 */ public function onClose(ConnectionInterface $connection) { Log::info("Connection closed: {$connection->resourceId}"); // 获取该连接订阅的所有频道,并执行解锁逻辑 if ($this->connectionChannels->contains($connection)) { $channels = $this->connectionChannels[$connection]; foreach ($channels as $channelName) { // 如果是特定资源频道,则执行解锁操作 if (str_starts_with($channelName, 'private-order.')) { $orderId = (int) substr($channelName, strlen('private-order.')); $this->unlockOrder($orderId, $connection->resourceId); }
以上就是定制Laravel Websockets连接生命周期与状态管理实践的详细内容,更多请关注php中文网其它相关文章!