本篇我们将通过 Swoole 实现一个自带连接池的 MySQL 查询器: 完整项目地址:[协程版 MySQL 查询器](https://github.com/linvanda/mysql) (注:该项目并非示例项目,而是生产可用的,已经在公司内部稳定使用。) 我们希望通过统一的入口对外提供服务,将复杂性隐藏在内部。该统一入口由查询模块提供。该模块由查询器和SQL 构造器构成,其中查询器作为外界唯一入口,而构造器是一个 Trait,因为这样可以让外界通过查询器入口直接使用构造器提供的 SQL 组装功能。 查询器通过事务模块执行 SQL。这里的事务有两个层面含义:数据库操作的事务性(显式或隐式事务,由 事务模块需要通过数据库连接对象执行具体的 SQL。连接对象由连接池模块提供。 连接池模块维护(创建、回收、销毁)数据库连接对象,具体是通过数据库连接模块的连接生成器生成新数据库连接。 模块之间依赖于接口而非具体实现:查询模块依赖事务模块的 下面,我们分模块具体讲解。 由查询模块对外提供统一的使用入口。查询模块由两部分构成:查询器和 SQL 构造器。为了让调用方可以直接通过查询器来构造 SQL(而不用先实例化一个构造器构造 SQL 然后传给查询器),我将构造器设计成 Trait 供查询器 Query 使用。 我们先看看查询器类 Query: 该入口类做了以下几件事情: 我们再简单看下 Builder 的实现: 构造器主要提供和 SQL 子句对应的方法来构造和编译 SQL,并提供对原生 SQL 的支持。 事务是集中管理 SQL 执行上下文的地方,所有的 SQL 都是在事务中执行的(没有调 begin() 则是隐式事务)。 我们的查询器是协程安全的,即一个 Query 实例可以在多个协程中并发执行事务而不会相互影响。协程安全性是通过事务模块保证的,这里需要处理两个维度的“事务”:数据库维度和协程维度。不但需要保证数据库事务的完整执行,还要保证多个协程间的 SQL 执行不会相互影响。 我们先看一个多协程并发执行事务的例子(在两个子协程中使用同一个 Query 实例执行事务:先从数据库查询用户信息,然后更新姓名): 上面代码执行步骤如图: 在上图两个协程不断切换过程中,各自的事务是在独立执行的,互不影响。 现实中,我们会在仓储中使用查询器,每个仓储持有一个查询器实例,而仓储是单例模式,多协程共享的,因而查询器也是多协程共享的。如下: 事务模块是如何实现协程并发事务的隔离性呢?我们用协程上下文 我们先看看协程上下文类: 协程上下文内部通过 $container 数组维护每个协程的数据。该类实现了 ArrayAccess 接口,可以通过下标访问,如: 再看看事务。 事务接口定义: 上面接口定义了事务管理器的主要工作:开启事务、执行 SQL、提交/回滚事务以及和本次事务执行相关的信息。 我们再来看看它的实现类 该类中,一次 SQL 执行(无论是显式事务还是隐式事务)的步骤: begin -> exec -> commit/rollback 优化: 类 一种优化方案是,在连接模块提供一个连接管理类供外部(事务模块)取还连接: 将 连接管理器 连接池模块由 IPool 接口和 CoPool 实现类组成。 连接池模块和连接模块之间的关系比较巧妙(上面优化后的方案)。从高层(接口层面)来说,连接池模块依赖连接模块:连接池操作(取还) 经过上面的分析我们得出,连接池模块和连接模块具有较强的耦合性,连接模块可以对外屏蔽掉连接池模块的存在,因而在设计上我们可以将这两个模块看成一个大模块放在一个目录下面,在该目录下再细分成两个内部模块即可。 我们先看看连接池接口。 从上面接口定义我们发现,该连接池并非通用连接池,而是针对数据库连接做的定制( 实现类 这里有几点需要注意: 后面在连接模块会讲解连接生成器,到时我们会知道一个连接池实例到底维护的是哪些连接对象。 连接模块负责和数据库建立连接并发出 SQL 请求,其底层使用 Swoole 的 MySQL 驱动。连接模块由连接对象和连接生成器构成,对外暴露 (在我们的优化版本中,一方面引入了连接管理器 连接对象的实现比较简单,我们重点看下 这里重点关注下查询的时候断线重连机制:先尝试执行 SQL,如果返回连接失败,则尝试重新连接。 我们再看看连接生成器: 该生成器是针对一主多从数据库架构的(包括未走读写分离的),如果使用是是其他数据库架构(如多主架构),则创建其他生成器即可。 同一套读写配置使用同一个生成器,对应的连接池也是同一个。 使用工厂组装查询器实例: 至此,整个查询器的编写、创建和使用就完成了。 优化版 UML 图:需求分析
使用示例
$query->select(['uid', 'name']) ->from('users u') ->join('auth_users au', "u.uid=au.uid") ->where(['uid' => $uid]) ->groupBy("u.phone") ->having("count(u.phone)>1") ->orderBy("u.uid desc") ->limit(10, 0) ->list();
$query->insert('users') ->values( [ [ 'name' => 'linvanda', 'phone' => '18687664562', 'nickname' => '林子', ], [ 'name' => 'xiake', 'phone' => '18989876543', 'nickname' => '侠客', ], ] )->execute();// 这里是批量插入,不需要批量插入的话,传入一维数组即可 // 延迟插入 $query->insert('users') ->delayed() ->values( [ 'name' => 'linvanda', 'phone' => '18687664562', 'nickname' => '林子', ] )->execute();
$query->update('users u') ->join('auth_users au', "u.uid=au.uid") ->set(['u.name' => '粽子']) ->where("u.uid=:uid", ['uid' => 123]) ->execute();
$query->delete('users') ->where("uid=:uid", ['uid' => 123]) ->execute();
$query->begin(); $query->update('users u') ->join('auth_users au', "u.uid=au.uid") ->set(['u.name' => '粽子']) ->where("u.uid=:uid", ['uid' => 123]) ->execute(); ... $query->commit();
模块设计
CoTransaction
类保障),以及多协程下的执行环境隔离性(由 TContext
类保障)。ITransaction
接口;事务模块依赖连接池模块的 IPool
接口和数据库连接模块的 IConnector
接口;连接池模块依赖数据库连接模块的 IConnectorBuilder
接口。UML 类图
入口
class Query { use Builder; public const MODEL_READ = 'read'; public const MODEL_WRITE = 'write'; private $transaction; public function __construct(ITransaction $transaction) { $this->transaction = $transaction; } /** * 开启事务 */ public function begin($model = 'write'): bool { return $this->transaction->begin($model); } /** * 提交事务 */ public function commit(): bool { return $this->transaction->commit(); } /** * 回滚事务 */ public function rollback(): bool { return $this->transaction->rollback(); } /** * 便捷方法:列表查询 */ public function list(): array { $list = $this->transaction->command(...$this->compile()); if ($list === false) { throw new DBException($this->lastError(), $this->lastErrorNo()); } return $list; } /** * 便捷方法:查询一行记录 */ public function one(): array { $list = $this->transaction->command(...$this->limit(1)->compile()); if ($list === false) { throw new DBException($this->lastError(), $this->lastErrorNo()); } if ($list) { return $list[0]; } return []; } ... /** * 执行 SQL * 有两种方式: * 1. 调此方法时传入相关参数; * 2. 通过 Builder 提供的 Active Record 方法组装 SQL,调此方法(不传参数)执行并返回结果 */ public function execute(string $preSql = '', array $params = []) { if (!func_num_args()) { $result = $this->transaction->command(...$this->compile()); } else { $result = $this->transaction->command(...$this->prepareSQL($preSql, $params)); } if ($result === false) { throw new DBException($this->lastError(), $this->lastErrorNo()); } return $result; } public function lastInsertId() { return $this->transaction->lastInsertId(); } public function affectedRows() { return $this->transaction->affectedRows(); } }
Trait Builder { ... public function select($fields = null) { if ($this->type) { return $this; } $this->type = 'select'; $this->fields($fields); return $this; } /** * 预处理 SQL * @param string $sql 格式:select * from t_name where uid=:uid * @param array $params 格式:['uid' => $uid] * @return array 输出格式:sql: select * from t_name where uid=?,params: [$uid] * @throws Exception */ private function prepareSQL(string $sql, array $params) { $sql = trim($sql); if (!$params) { return [$sql, []]; } preg_match_all('/:([^s;]+)/', $sql, $matches); if (!($matches = $matches[1])) { return [$sql, []]; } if (count($matches) !== count($params)) { throw new Exception("SQL 占位数与参数个数不符。SQL:$sql,参数:" . print_r($params, true)); } $p = []; foreach ($matches as $flag) { if (!array_key_exists($flag, $params)) { throw new Exception("SQL 占位符与参数不符。SQL:$sql,参数:" . print_r($params, true)); } $value = $params[$flag]; if ($this->isExpression($value)) { $sql = preg_replace("/:$flag(?=s|$)/", $value, $sql); } else { $p[] = $value; } } $sql = preg_replace('/:[-a-zA-Z0-9_]+/', '?', $sql); return [$sql, $p]; } /** * 编译 * 目前仅支持 select,update,insert,replace,delete * @param bool $reset 编译后是否重置构造器 * @return array [$preSql, $params] */ private function compile(bool $reset = true) { if (!$this->type) { return ['', []]; } $method = 'compile' . ucfirst($this->type); if (method_exists($this, $method)) { $this->rawSqlInfo = $this->$method(); if ($reset) { $this->reset(); } return $this->rawSqlInfo; } return ['', []]; } private function compileSelect() { $sql = "select $this->fields "; $params = []; if ($this->table) { $sql .= implode( ' ', array_filter([ 'from', $this->table, $this->join, $this->where, $this->groupBy, $this->having, $this->orderBy, $this->limitStr() ]) ); $params = array_merge($this->joinParams, $this->whereParams, $this->havingParams); } return [$this->trimSpace($sql), $params]; } ... /** * 条件(where、on、having 等) * 为了记忆和使用方便,目前只提供了最基本的一些形式,复杂的条件请使用原生写法 * $conditions 数组格式: * // 基本的 and 查询 * [ * 'uid' => 232, * 'name' => '里斯', * 'b.age' => 34, * 'level_id' => [1,2,3], // in * 'count' => new Expression('count + 1'), * ] * * [ * "(uid=:uid1 or uid=:uid2) and count=:count", // 原生预处理 SQL * ['uid1' => 12, 'uid2' => 13, 'count' => new Expression('count+1')] * ] * @param string|array $conditions * @return array [$preSql, $params],$preSql: 用 ? 占位的预处理 SQL * @throws Exception */ private function condition($conditions) { if (is_string($conditions)) { return [$conditions, []]; } if (!$conditions || !is_array($conditions)) { return []; } if (is_int(key($conditions)) && count($conditions) <= 2) { if (count($conditions) == 1) { $conditions[1] = []; } return $this->prepareSQL($conditions[0], $conditions[1]); } $where = '1=1'; $params = []; foreach ($conditions as $key => $condition) { $key = $this->plainText($key); if (is_array($condition)) { // in 查询 $where .= " and $key in(" . implode(',', array_fill(0, count($condition), '?')) . ')'; $params = array_merge($params, $condition); } else { // = 查询 if ($this->isExpression($condition)) { $where .= " and $key = $condition"; } else { $where .= " and $key = ?"; $params[] = $condition; } } } return [str_replace('1=1 and ', '', $where), $params]; } ... }
该构造器并未对所有的 SQL 语句做方法上的实现(比如子查询),只对最常用的功能提供了支持,复杂的 SQL 建议直接写 SQL 语句(一些框架对复杂 SQL 构造也提供了方法级别的支持,但这其实会带来使用和维护上的复杂性,它导致 SQL 不够直观)。事务
$query = new Query(...); for ($i = 0; $i < 2; $i++) { go(function () use ($query) { $query->begin(); $user = $query->select("uid,name")->from("users")->where("phone=:phone", ["phone" => "13908987654"])->one(); $query->update('users')->set(['name' => "李四"])->where("uid=:uid", ['uid' => $user['uid']])->execute(); $query->commit(); }); }
/** * MySQL 仓储基类 * 仓储是单例模式(通过容器实现单例),多协程会共享同一个仓储实例 */ abstract class MySQLRepository extends Repository implements ITransactional { /** * 查询器 */ protected $query; public function __construct() { if (!$this->dbAlias()) { throw new Exception('dbName can not be null'); } // 通过工厂创建查询器实例 $this->query = MySQLFactory::build($this->dbAlias()); } ... }
TContext
类实现协程间数据的隔离,事务类 CoTransaction
持有 TContext
实例,事务中所有的状态信息都通过 TContext
存取,以实现协程间状态数据互不影响。class TContext implements ArrayAccess { private $container = []; ... public function offsetGet($offset) { if (!isset($this->container[Co::getuid()])) { return null; } return $this->container[Co::getuid()][$offset] ?? null; } public function offsetSet($offset, $value) { $cuid = Co::getuid(); if (!isset($this->container[$cuid])) { $this->init(); } $this->container[$cuid][$offset] = $value; } private function init() { $this->container[Co::getuid()] = []; // 协程退出时需要清理当前协程上下文 Co::defer(function () { unset($this->container[Co::getuid()]); }); } }
// 创建上下文实例 $context = new TContext(); // 设置当前协程的数据 $context["model"] = "write"; // 访问当前协程的数据 $context["model"];
interface ITransaction { public function begin(string $model = 'write', bool $isImplicit = false): bool; /** * 发送 SQL 指令 */ public function command(string $preSql, array $params = []); /** * 提交事务 * @param bool $isImplicit 是否隐式事务,隐式事务不会向 MySQL 提交 commit (要求数据库服务器开启了自动提交的配置) * @return bool * @throws Exception */ public function commit(bool $isImplicit = false): bool; public function rollback(): bool; /** * 获取或设置当前事务执行模式 * @param string 读/写模式 read/write * @return string 当前事务执行模式 */ public function model(?string $model = null): string; ... /** * 获取一次事务中执行的 SQL 列表 * @return array */ public function sql():array; }
CoTransaction
,该类是整个查询器中最重要的类,我们把整个类的代码完整贴出来:/** * 协程版事务管理器 * 注意:事务开启直到提交/回滚的过程中会一直占用某个 IConnector 实例,如果有很多长事务,则会很快耗完连接池资源 */ class CoTransaction implements ITransaction { private $pool; // 事务的所有状态信息(运行状态、SQL、运行模式、运行结果等)都是存储在上下文中 private $context; /** * 创建事务实例时需要提供连接池,并在内部创建该事物的协程上下文实例 */ public function __construct(IPool $pool) { $this->pool = $pool; $this->context = new TContext(); } public function __destruct() { // 如果事务没有结束,则回滚 if ($this->isRunning()) { $this->rollback(); } } /** * 开启事务 */ public function begin(string $model = 'write', bool $isImplicit = false): bool { // 如果事务已经开启了,则直接返回 if ($this->isRunning()) { return true; } // 事务模式(决定从读连接池还是写连接池拿连接对象) $this->model($model); // 设置事务运行状态 $this->isRunning(true); // 获取数据库连接 try { if (!($connector = $this->connector())) { throw new ConnectException("获取连接失败"); } } catch (Exception $exception) { $this->isRunning(false); throw new TransactionException($exception->getMessage(), $exception->getCode()); } // 开启新事务前,需要清除上一次事务的数据 $this->resetLastExecInfo(); $this->clearSQL(); // 调用数据库连接对象的 begin 方法开始事务(如果是隐式事务则不调用) return $isImplicit || $connector->begin(); } /** * 执行 SQL 指令 * 如果是隐式事务,则在该方法中自动调用 begin 和 commit 方法 */ public function command(string $preSql, array $params = []) { if (!$preSql) { return false; } // 是否隐式事务:外界没有调用 begin 而是直接调用 command 则为隐式事务 $isImplicit = !$this->isRunning(); // 如果是隐式事务,则需要自动开启事务 if ($isImplicit && !$this->begin($this->calcModelFromSQL($preSql), true)) { return false; } // 执行 SQL $result = $this->exec([$preSql, $params]); // 隐式事务需要及时提交 if ($isImplicit && !$this->commit($isImplicit)) { return false; } return $result; } /** * 提交事务 */ public function commit(bool $isImplicit = false): bool { if (!$this->isRunning()) { return true; } $result = true; if (!$isImplicit) { // 显式事务才需要真正提交到 MySQL 服务器 if ($conn = $this->connector(false)) { $result = $conn->commit(); if ($result === false) { // 执行失败,试图回滚 $this->rollback(); return false; } } else { return false; } } // 释放事务占用的资源 $this->releaseTransResource(); return $result; } /** * 回滚事务 * 无论是提交还是回滚,都需要释放本次事务占用的资源 */ public function rollback(): bool { if (!$this->isRunning()) { return true; } if ($conn = $this->connector(false)) { $conn->rollback(); } $this->releaseTransResource(); return true; } /** * 获取或设置当前事务执行模式 */ public function model(?string $model = null): string { // 事务处于开启状态时不允许切换运行模式 if (!isset($model) || $this->isRunning()) { return $this->context['model']; } $this->context['model'] = $model === 'read' ? 'read' : 'write'; return $model; } public function lastInsertId() { return $this->getLastExecInfo('insert_id'); } public function affectedRows() { return $this->getLastExecInfo('affected_rows'); } public function lastError() { return $this->getLastExecInfo('error'); } public function lastErrorNo() { return $this->getLastExecInfo('error_no'); } /** * 本次事务执行的所有 SQL * 该版本并没有做记录 */ public function sql(): array { return $this->context['sql'] ?? []; } /** * 释放当前事务占用的资源 */ private function releaseTransResource() { // 保存本次事务相关执行结果供外界查询使用 $this->saveLastExecInfo(); // 归还连接资源 $this->giveBackConnector(); unset($this->context['model']); $this->isRunning(false); } /** * 保存事务最终执行的一些信息 */ private function saveLastExecInfo() { if ($conn = $this->connector(false)) { $this->context['last_exec_info'] = [ 'insert_id' => $conn->insertId(), 'error' => $conn->lastError(), 'error_no' => $conn->lastErrorNo(), 'affected_rows' => $conn->affectedRows(), ]; } else { $this->context['last_exec_info'] = []; } } private function resetLastExecInfo() { unset($this->context['last_exec_info']); } private function getLastExecInfo(string $key) { return isset($this->context['last_exec_info']) ? $this->context['last_exec_info'][$key] : ''; } /** * 执行指令池中的指令 * @param $sqlInfo * @return mixed * @throws */ private function exec(array $sqlInfo) { if (!$sqlInfo || !$this->isRunning()) { return true; } return $this->connector()->query($sqlInfo[0], $sqlInfo[1]); } private function clearSQL() { unset($this->context['sql']); } private function calcModelFromSQL(string $sql): string { if (preg_match('/^(update|replace|delete|insert|drop|grant|truncate|alter|create)s/i', trim($sql))) { return 'write'; } return 'read'; } /** * 获取连接资源 */ private function connector(bool $usePool = true) { if ($connector = $this->context['connector']) { return $connector; } if (!$usePool) { return null; } $this->context['connector'] = $this->pool->getConnector($this->model()); return $this->context['connector']; } /** * 归还连接资源 */ private function giveBackConnector() { if ($this->context['connector']) { $this->pool->pushConnector($this->context['connector']); } unset($this->context['connector']); } private function isRunning(?bool $val = null) { if (isset($val)) { $this->context['is_running'] = $val; } else { return $this->context['is_running'] ?? false; } } }
CoTransaction
依赖 IPool
连接池,这种设计并不合理(违反了迪米特法则)。从逻辑上说,事务管理类真正依赖的是连接对象,而非连接池对象,因而事务模块应该依赖连接模块而不是连接池模块。让事务管理类依赖连接池,一方面向事务模块暴露了连接管理的细节, 另一方面意味着如果使用该事务管理类,就必须使用连接池技术。interface IConnectorManager { public function getConnector() IConnector; public function giveBackConnector(IConnector $conn); }
IConnectorManager
注入到 CoTransaction
中:class CoTransaction implements ITransaction { ... public function __construct(IConnectorManager $connMgr) { ... } }
IConnectorManager
承担了工厂方法角色,至此,事务模块仅依赖连接模块,而不用依赖连接池。连接池
IConnector
的实例;从实现上来说,连接模块同时又依赖连接池模块:PoolConnectorManager
(使用连接池技术的连接管理器)依赖连接池模块来操作连接对象(由于该依赖是实现层面的而非接口层面,因而它不是必然的,如果连接管理器不使用连接池技术则不需要依赖连接池模块)。“连接管理器”这个角色很重要:它对外(事务模块)屏蔽了连接池模块的存在,代价是在内部引入了对连接池模块的依赖(也就是用内部依赖换外部依赖)。interface IPool { /** * 从连接池中获取连接对象 */ public function getConnector(string $type = 'write'): IConnector; /** * 归还连接 */ public function pushConnector(IConnector $connector): bool; /** * 连接池中连接数 * @return array ['read' => 3, 'write' => 3] */ public function count(): array; /** * 关闭连接池 */ public function close(): bool; }
count()
方法返回的数据里面有 read、write,它暴露了一个细节:该连接池内部维护了读写两种连接池)。此设计也透露出该模块和连接模块的强耦合性。CoPool
稍显复杂,我们贴出代码:class CoPool implements IPool { ... protected static $container = []; protected $readPool; protected $writePool; // 数据库连接生成器,连接池使用此生成器创建连接对象 protected $connectorBuilder; // 连接池大小 protected $size; // 记录每个连接的相关信息 protected $connectsInfo = []; // 当前存活的连接数(包括不在池中的) protected $connectNum; // 读连接数 protected $readConnectNum; // 写连接数 protected $writeConnectNum; protected $maxSleepTime; protected $maxExecCount; protected $status; // 连续等待连接对象失败次数(一般是长事务导致某次事务处理长时间占用连接资源) protected $waitTimeoutNum; protected function __construct(IConnectorBuilder $connectorBuilder, int $size = 25, int $maxSleepTime = 600, int $maxExecCount = 1000) { // 创建读写 Channel $this->readPool = new coChannel($size); $this->writePool = new coChannel($size); ... } /** * 单例 * 实际是伪单例 */ public static function instance(IConnectorBuilder $connectorBuilder, int $size = 25, int $maxSleepTime = 600, int $maxExecCount = 1000): CoPool { // 同一个连接生成器创建的连接对象由同一个连接池管理 if (!isset(static::$container[$connectorBuilder->getKey()])) { static::$container[$connectorBuilder->getKey()] = new static($connectorBuilder, $size, $maxSleepTime, $maxExecCount); } return static::$container[$connectorBuilder->getKey()]; } /** * 从连接池中获取 MySQL 连接对象 */ public function getConnector(string $type = 'write'): IConnector { if (!$this->isOk()) { throw new PoolClosedException("连接池已经关闭,无法获取连接"); } // 根据读写模式选择是使用读连接池还是写连接池 $pool = $this->getPool($type); // 连接池是空的,试图创建连接 if ($pool->isEmpty()) { // 超额,不能再创建,需等待 // 此处试图创建的数量大于连接池真正大小,是为了应对高峰期等异常情况。归还的时候多创建出来的会直接被关闭掉(随建随销) if (($type == 'read' ? $this->readConnectNum : $this->writeConnectNum) > $this->size * 6) { // 多次等待失败,则直接返回 // 优化点:这里面没有针对读写池分别计数 if ($this->waitTimeoutNum > self::MAX_WAIT_TIMEOUT_NUM) { // 超出了等待失败次数限制,直接抛异常 throw new ConnectFatalException("多次获取连接超时,请检查数据库服务器状态"); } // 等待连接归还 $conn = $pool->pop(4); // 等待失败 if (!$conn) { switch ($pool->errCode) { case SWOOLE_CHANNEL_TIMEOUT: // 优化:要区分读池子超时还是写池子超时 $this->waitTimeoutNum++; $errMsg = "获取连接超时"; break; case SWOOLE_CHANNEL_CLOSED: $errMsg = "获取连接失败:连接池已关闭"; break; default: $errMsg = "获取连接失败"; break; } throw new ConnectException($errMsg); } } else { try { // 创建新连接 $conn = $this->createConnector($type); } catch (ConnectException $exception) { if ($exception->getCode() == 1040) { // 连接数据库时失败:Too many connections,此时需要等待被占用的连接归还 // 这里可以优化下:先判断下该连接池有无在维护的连接,有的话才等待 $conn = $pool->pop(4); } if (!$conn) { // 等待连接超时,记录超时次数 if ($pool->errCode == SWOOLE_CHANNEL_TIMEOUT) { $this->waitTimeoutNum++; } throw new ConnectException($exception->getMessage(), $exception->getCode()); } } } } else { // 从连接池获取 $conn = $pool->pop(1); } // 变更该连接的状态信息 $connectInfo = $this->connectInfo($conn); $connectInfo->popTime = time(); $connectInfo->status = ConnectorInfo::STATUS_BUSY; // 成功拿到连接,将等待次数清零 $this->waitTimeoutNum = 0; return $conn; } /** * 归还连接 * @param IConnector $connector * @return bool */ public function pushConnector(IConnector $connector): bool { if (!$connector) { return true; } $connInfo = $this->connectInfo($connector); $pool = $this->getPool($connInfo->type); // 如果连接池有问题(如已关闭)、满了、连接有问题,则直接关闭 if (!$this->isOk() || $pool->isFull() || !$this->isHealthy($connector)) { return $this->closeConnector($connector); } // 变更连接状态 if ($connInfo) { $connInfo->status = ConnectorInfo::STATUS_IDLE; $connInfo->pushTime = time(); } return $pool->push($connector); } /** * 关闭连接池 * @return bool */ public function close(): bool { $this->status = self::STATUS_CLOSED; // 关闭通道中所有的连接。等待5ms为的是防止还有等待push的排队协程 while ($conn = $this->readPool->pop(0.005)) { $this->closeConnector($conn); } while ($conn = $this->writePool->pop(0.005)) { $this->closeConnector($conn); } // 关闭通道 $this->readPool->close(); $this->writePool->close(); return true; } public function count(): array { return [ 'read' => $this->readPool->length(), 'write' => $this->writePool->length() ]; } protected function closeConnector(IConnector $connector) { if (!$connector) { return true; } $objId = $this->getObjectId($connector); // 关闭连接 $connector->close(); // 处理计数 $this->untickConnectNum($this->connectsInfo[$objId]->type); // 删除统计信息 unset($this->connectsInfo[$objId]); return true; } protected function isOk() { return $this->status == self::STATUS_OK; } protected function getPool($type = 'write'): coChannel { if (!$type || !in_array($type, ['read', 'write'])) { $type = 'write'; } return $type === 'write' ? $this->writePool : $this->readPool; } /** * 创建新连接对象 * @param string $type * @return IConnector * @throws DevMySQLExceptionConnectException */ protected function createConnector($type = 'write'): IConnector { $conn = $this->connectorBuilder->build($type); if ($conn) { // 要在 connect 前 tick(计数),否则无法阻止高并发协程打入(因为 connect 会造成本协程控制权让出,此时本次计数还没有增加) $this->tickConnectNum($type); try { $conn->connect(); } catch (ConnectException $exception) { // 撤销 tick $this->untickConnectNum($type); throw new ConnectException($exception->getMessage(), $exception->getCode()); } // 创建本连接的统计信息实例 $this->connectsInfo[$this->getObjectId($conn)] = new ConnectorInfo($conn, $type); } return $conn; } protected function tickConnectNum(string $type) { $this->changeConnectNum($type, 1); } protected function untickConnectNum(string $type) { $this->changeConnectNum($type, -1); } private function changeConnectNum(string $type, $num) { $this->connectNum = $this->connectNum + $num; if ($type == 'read') { $this->readConnectNum = $this->readConnectNum + $num; } else { $this->writeConnectNum = $this->writeConnectNum + $num; } } /** * 检查连接对象的健康情况,以下情况视为不健康: * 1. SQL 执行次数超过阈值; * 2. 连接对象距最后使用时间超过阈值; * 3. 连接对象不是连接池创建的 * @param IConnector $connector * @return bool */ protected function isHealthy(IConnector $connector): bool { $connectorInfo = $this->connectInfo($connector); if (!$connectorInfo) { return false; } // 如果连接处于忙态(一般是还处于事务未提交状态),则一律返回 ok if ($connectorInfo->status === ConnectorInfo::STATUS_BUSY) { return true; } if ( $connectorInfo->execCount() >= $this->maxExecCount || time() - $connectorInfo->lastExecTime() >= $this->maxSleepTime ) { return false; } return true; } protected function connectInfo(IConnector $connector): ConnectorInfo { return $this->connectsInfo[$this->getObjectId($connector)]; } protected function getObjectId($object): string { return spl_object_hash($object); } }
连接
IConnector
和 IConnectorBuilder
接口。IConnectorManager
,另一方面将连接模块和连接池模块合并成一个大模块,因而整个连接模块对外暴露的是 IConnectorManager
和 IConnector
两个接口。)CoConnector
里面查询的处理:class CoConnector implements IConnector { ... public function __destruct() { $this->close(); } public function connect(): bool { if ($this->mysql->connected) { return true; } $conn = $this->mysql->connect($this->config); if (!$conn) { throw new ConnectException($this->mysql->connect_error, $this->mysql->connect_errno); } return $conn; } /** * 执行 SQL 语句 */ public function query(string $sql, array $params = [], int $timeout = 180) { $prepare = $params ? true : false; $this->execCount++; $this->lastExecTime = time(); // 是否要走 prepare 模式 if ($prepare) { $statement = $this->mysql->prepare($sql, $timeout); // 失败,尝试重新连接数据库 if ($statement === false && $this->tryReconnectForQueryFail()) { $statement = $this->mysql->prepare($sql, $timeout); } if ($statement === false) { $result = false; goto done; } // execute $result = $statement->execute($params, $timeout); // 失败,尝试重新连接数据库 if ($result === false && $this->tryReconnectForQueryFail()) { $result = $statement->execute($params, $timeout); } } else { $result = $this->mysql->query($sql, $timeout); // 失败,尝试重新连接数据库 if ($result === false && $this->tryReconnectForQueryFail()) { $result = $this->mysql->query($sql, $timeout); } } done: $this->lastExpendTime = time() - $this->lastExecTime; $this->peakExpendTime = max($this->lastExpendTime, $this->peakExpendTime); return $result; } ... /** * 失败重连 */ private function tryReconnectForQueryFail() { if ($this->mysql->connected || !in_array($this->mysql->errno, [2006, 2013])) { return false; } // 尝试重新连接 $connRst = $this->connect(); if ($connRst) { // 连接成功,需要重置以下错误(swoole 在重连成功后并没有重置这些属性) $this->mysql->error = ''; $this->mysql->errno = 0; $this->mysql->connect_error = ''; $this->mysql->connect_errno = 0; } return $connRst; } ... }
class CoConnectorBuilder implements IConnectorBuilder { protected static $container = []; protected $writeConfig; protected $readConfigs; protected $key; /** * 创建生成器的时候需要提供读写连接配置,其中读配置是一个数组(可以有多个读连接) */ protected function __construct(DBConfig $writeConfig = null, array $readConfigs = []) { $this->writeConfig = $writeConfig; $this->readConfigs = $readConfigs; } /** * 这里采用伪单例:同样的读写配置使用同一个生成器 */ public static function instance(DBConfig $writeConfig = null, array $readConfigs = []): CoConnectorBuilder { if ($writeConfig && !$readConfigs) { $readConfigs = [$writeConfig]; } $key = self::calcKey($writeConfig, $readConfigs); if (!isset(self::$container[$key])) { $builder = new static($writeConfig, $readConfigs); $builder->key = $key; self::$container[$key] = $builder; } return self::$container[$key]; } /** * 创建并返回 IConnector 对象 * @param string $connType read/write * @return IConnector */ public function build(string $connType = 'write'): IConnector { /** @var DBConfig */ $config = $connType == 'read' ? $this->getReadConfig() : $this->writeConfig; if (!($config instanceof DBConfig)) { return null; } return new CoConnector($config->host, $config->user, $config->password, $config->database, $config->port, $config->timeout, $config->charset); } public function getKey(): string { return $this->key; } private function getReadConfig() { return $this->readConfigs[mt_rand(0, count($this->readConfigs) - 1)]; } /** * 根据配置计算 key,完全相同的配置对应同样的 key */ private static function calcKey(DBConfig $writeConfig = null, array $readConfigs = []): string { $joinStr = function ($conf) { $arr = [ $conf->host, $conf->port, $conf->user, $conf->password, $conf->database, $conf->charset, $conf->timeout, ]; sort($arr); return implode('-', $arr); }; $readArr = []; foreach ($readConfigs as $readConfig) { $readArr[] = $joinStr($readConfig); } sort($readArr); return md5($joinStr($writeConfig) . implode('$', $readArr)); } }
DBConfig
是一个 DTO 对象,不再阐述。查询器的组装
class MySQLFactory { /** * @param string $dbAlias 数据库配置别名,对应配置文件中数据库配置的 key */ public static function build(string $dbAlias): Query { // 从配置文件获取数据库配置 $dbConf = Config::getInstance()->getConf("mysql.$dbAlias"); if (!$dbConf) { throw new ConfigNotFoundException("mysql." . $dbAlias); } if (!isset($dbConf['read']) && !isset($dbConf['write'])) { $writeConf = $dbConf; $readConfs = [$writeConf]; } else { $writeConf = $dbConf['write'] ?? []; $readConfs = $dbConf['read'] ?? [$writeConf]; } $writeConfObj = self::createConfObj($writeConf); $readConfObjs = []; foreach ($readConfs as $readConf) { $readConfObjs[] = self::createConfObj($readConf); } // 创建生成器、连接池、事务管理器 // 在优化后版本中,用连接管理器代替连接池的位置即可 $mySQLBuilder = CoConnectorBuilder::instance($writeConfObj, $readConfObjs); $pool = CoPool::instance($mySQLBuilder, $dbConf['pool']['size'] ?? 30); $transaction = new CoTransaction($pool); return new Query($transaction); } private static function createConfObj(array $config): DBConfig { if (!$config) { throw new Exception("config is null"); } return new DBConfig( $config['host'], $config['user'], $config['password'], $config['database'], $config['port'] ?? 3306, $config['timeout'] ?? 3, $config['charset'] ?? 'utf8' ); } }
总结
CoTransaction
时必须同时使用连接池;
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算