首先要考虑一个前提:当业务系统考虑分库分表时,说明数据无论是存量还是增量都达到一个必须考虑水平扩展的量级。如果系统数据量在可控范围内,请勿过早进行优化分库分表,分库分表本身是增加了系统复杂度的设计。
一、首先要想清楚:为什么需要分库分表?
当单表数据量突破 500万行 或容量超过 2GB 时,即使添加索引、优化SQL,MySQL的性能瓶颈依然会逐步显现。典型症状包括:
- 查询响应时间从毫秒级退化到秒级
- 写入出现锁竞争,事务超时频发
- 备份恢复耗时过长,影响业务连续性
- 单实例连接数打满,无法水平扩展
此时,分库分表(Sharding)成为突破单机瓶颈的终极方案。但需要强调的是:能不分就不分,优先通过索引优化、读写分离、缓存等手段解决问题。
1.1 分库分表整体架构
graph TB
subgraph "应用层"
App["业务应用 PHP/Go服务"]
SDK["分片SDK 路由计算"]
end
subgraph "中间件层"
LB["负载均衡 Nginx/HAProxy"]
Proxy["数据库代理 可选:ShardingSphere-Proxy"]
end
subgraph "数据层 - 分库"
direction TB
DS0["数据源0 ds_0"]
DS1["数据源1 ds_1"]
DS2["数据源2 ds_2"]
DS3["数据源3 ds_31"]
end
subgraph "分表 - ds_0"
T0_0["orders_0 100万行"]
T0_1["orders_1 100万行"]
T0_2["orders_2 100万行"]
T0_3["orders_31 100万行"]
end
subgraph "分表 - ds_1"
T1_0["orders_0 100万行"]
T1_1["orders_1 100万行"]
T1_2["orders_2 100万行"]
T1_3["orders_31 100万行"]
end
App --> SDK
SDK --> LB
LB --> Proxy
Proxy --> DS0
Proxy --> DS1
Proxy --> DS2
Proxy --> DS3
DS0 --> T0_0
DS0 --> T0_1
DS0 --> T0_2
DS0 --> T0_3
DS1 --> T1_0
DS1 --> T1_1
DS1 --> T1_2
DS1 --> T1_3
style App fill:#e1f5ff
style SDK fill:#fff4e1
style DS0 fill:#f0f9e8
style DS1 fill:#f0f9e8
style DS2 fill:#f0f9e8
style DS3 fill:#f0f9e8架构说明:
- 32个数据库 × 32张表 = 1024个分片
- 每个分片承载约100万行数据,总容量可达10亿+行
- 通过位运算路由:
db_index = user_id & 31,table_index = (user_id >> 5) & 31
二、唯一ID生成方案:分布式系统的”身份证”
分库分表后,数据库自增主键失效,需要设计全局唯一的分布式ID。以下是主流方案对比:
2.1 方案选型矩阵
| 方案 | 优点 | 缺点 | 适用场景 |
|---|
| 数据库自增 | 简单可靠 | 单点瓶颈、无法水平扩展 | 小流量业务 |
| 号段模式(Leaf) | 高性能、低延迟、支持批量获取 | 需独立服务部署 | 通用推荐 |
| 雪花算法(Snowflake) | 趋势递增、高性能(400万+/秒) | 时钟回拨风险、需机器标识 | 高并发场景 |
| 业务属性ID | 自带路由信息、无需额外查询 | ID较长、可读性差 | 订单/交易类 |
2.2 ID生成流程架构
sequenceDiagram
participant Client as 业务客户端
participant SDK as ID生成SDK
participant Cache as Redis/ZK 时钟保护
participant Generator as Snowflake 生成器
Client->>SDK: nextId(businessTag, userId)
activate SDK
SDK->>Cache: getLastTimestamp()
activate Cache
Cache-->>SDK: lastTimestamp
deactivate Cache
alt 时钟回拨检测
SDK->>SDK: currentTime < lastTimestamp
SDK->>Client: Error: Clock moved backwards
else 正常流程
SDK->>Generator: generate(timestamp)
activate Generator
Generator->>Generator: 位运算组合 41位时间戳 5位机房ID 5位机器ID 1位业务标识 11位用户分片 12位序列号
Generator-->>SDK: 64位整数ID
deactivate Generator
SDK->>Cache: updateTimestamp(currentTime)
activate Cache
Cache-->>SDK: OK
deactivate Cache
SDK-->>Client: "ID: 1847293847562938475"
end
deactivate SDK
Note over Client,Generator: "ID结构示例: 00011011 01001011 11010101 01101001"2.3 生产推荐:Go语言增强版雪花算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| package idgen
import ( "sync" "time" "errors" )
const ( epoch = 1672531200000 workerBits uint8 = 5 stepBits uint8 = 12 businessBits uint8 = 1 userShardBits uint8 = 11 maxWorker = -1 ^ (-1 << workerBits) maxSequence = -1 ^ (-1 << stepBits) )
type Snowflake struct { mu sync.Mutex timestamp int64 sequence int64 workerId int64 datacenterId int64 }
func NewSnowflake(workerId, datacenterId int64) (*Snowflake, error) { if workerId > maxWorker || workerId < 0 { return nil, errors.New("workerId out of range") } return &Snowflake{ workerId: workerId, datacenterId: datacenterId, timestamp: 0, sequence: 0, }, nil }
func (sf *Snowflake) NextID(businessTag int64, userId int64) (int64, error) { sf.mu.Lock() defer sf.mu.Unlock() now := time.Now().UnixMilli() if now < sf.timestamp { return 0, errors.New("clock moved backwards") } if now == sf.timestamp { sf.sequence = (sf.sequence + 1) & maxSequence if sf.sequence == 0 { for now <= sf.timestamp { now = time.Now().UnixMilli() } } } else { sf.sequence = 0 } sf.timestamp = now userShard := userId & ((1 << userShardBits) - 1) id := (now - epoch) << (workerBits + stepBits + businessBits + userShardBits) id |= sf.datacenterId << (workerBits + stepBits + businessBits + userShardBits - workerBits) id |= sf.workerId << (stepBits + businessBits + userShardBits) id |= (businessTag & 1) << (stepBits + userShardBits) id |= userShard << stepBits id |= sf.sequence return id, nil }
|
2.4 PHP版本实现(兼容老项目)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| <?php
declare(strict_types=1);
class Snowflake { private const EPOCH = 1672531200000; private const WORKER_BITS = 5; private const STEP_BITS = 12; private const BUSINESS_BITS = 1; private const USER_SHARD_BITS = 11; private int $workerId; private int $datacenterId; private int $sequence = 0; private int $lastTimestamp = -1; private Mutex $lock; public function __construct(int $workerId, int $datacenterId) { if ($workerId < 0 || $workerId > ((1 << self::WORKER_BITS) - 1)) { throw new InvalidArgumentException("workerId out of range"); } $this->workerId = $workerId; $this->datacenterId = $datacenterId; $this->lock = Mutex::create(); }
public function nextId(int $businessTag, int $userId): int { Mutex::lock($this->lock); try { $timestamp = $this->currentTimeMillis(); if ($timestamp < $this->lastTimestamp) { throw new RuntimeException("Clock moved backwards"); } if ($timestamp === $this->lastTimestamp) { $this->sequence = ($this->sequence + 1) & ((1 << self::STEP_BITS) - 1); if ($this->sequence === 0) { while (($timestamp = $this->currentTimeMillis()) <= $this->lastTimestamp) { usleep(1000); } } } else { $this->sequence = 0; } $this->lastTimestamp = $timestamp; $userShard = $userId & ((1 << self::USER_SHARD_BITS) - 1); $id = (($timestamp - self::EPOCH) << (self::WORKER_BITS + self::STEP_BITS + self::BUSINESS_BITS + self::USER_SHARD_BITS)) | ($this->datacenterId << (self::WORKER_BITS + self::STEP_BITS + self::BUSINESS_BITS + self::USER_SHARD_BITS - self::WORKER_BITS)) | ($this->workerId << (self::STEP_BITS + self::BUSINESS_BITS + self::USER_SHARD_BITS)) | (($businessTag & 1) << (self::STEP_BITS + self::USER_SHARD_BITS)) | ($userShard << self::STEP_BITS) | $this->sequence; return $id; } finally { Mutex::unlock($this->lock); } } private function currentTimeMillis(): int { return (int)(microtime(true) * 1000); } }
|
方案优势:
- ✅ 自带分片信息:ID末尾嵌入用户标识,查询时无需额外查路由表
- ✅ 趋势递增:时间戳在高位,利于B+树索引和范围查询
- ✅ 时钟回拨防护:通过互斥锁+时间校验避免重复
- ✅ 业务隔离:通过businessTag区分不同业务线
📌 实践建议:用户标识取后11位(0-2047)足够支撑千级分片,配合mod 2ⁿ策略可实现平滑扩容。
三、分库分表方案:生产级设计要点(重点)
3.1 拆分策略选择
水平拆分核心原则
1 2
| 分片数 = 2的N次方(推荐32/64/128/1024) 原因:便于后续扩容时通过位运算平滑迁移,避免全量重哈希
|
分片路由计算流程
flowchart LR
subgraph "输入参数"
UserId["用户ID 1234567890"]
end
subgraph "分库计算"
DB_Bits["分库位数: 5 32个库"]
DB_Mask["掩码: 0x1F 31"]
DB_Op["位运算: user_id & 31"]
DB_Result["分库索引: 18 ds_18"]
end
subgraph "分表计算"
Shift["右移5位: user_id >> 5"]
Table_Bits["分表位数: 5 32张表"]
Table_Mask["掩码: 0x1F 31"]
Table_Op["位运算: shifted & 31"]
Table_Result["分表索引: 7 orders_7"]
end
subgraph "最终路由"
Final["目标分片: ds_18.orders_7"]
end
UserId --> DB_Op
DB_Bits --> DB_Mask --> DB_Op
DB_Op --> DB_Result
UserId --> Shift
Shift --> Table_Op
Table_Bits --> Table_Mask --> Table_Op
Table_Op --> Table_Result
DB_Result --> Final
Table_Result --> Final
style UserId fill:#ffe6e6
style Final fill:#e6ffe6
style DB_Result fill:#fff4e6
style Table_Result fill:#fff4e6Go语言路由策略实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package sharding
type ShardingConfig struct { DatabaseCount int TableCount int }
func (cfg *ShardingConfig) CalculateShard(userId int64) (dbIndex, tableIndex int) { dbIndex = int(userId) & (cfg.DatabaseCount - 1) tableIndex = int(userId>>uint(cfg.bitLen())) & (cfg.TableCount - 1) return dbIndex, tableIndex }
func (cfg *ShardingConfig) bitLen() int { bits := 0 for n := cfg.DatabaseCount; n > 1; n >>= 1 { bits++ } return bits }
func ParseShardFromID(id int64, userShardBits uint8) (userShard int64) { return (id >> 12) & ((1 << userShardBits) - 1) }
|
PHP版本路由实现(Laravel风格)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| <?php
namespace App\Sharding;
class Router { public function __construct( private int $databaseCount, // 32 private int $tableCount // 32 ) { if (($this->databaseCount & ($this->databaseCount - 1)) !== 0) { throw new \InvalidArgumentException("databaseCount must be power of 2"); } }
public function calculateShard(int $userId): array { $dbIndex = $userId & ($this->databaseCount - 1); $dbBits = (int)log2($this->databaseCount); $tableIndex = ($userId >> $dbBits) & ($this->tableCount - 1); return [ 'db' => sprintf('ds_%d', $dbIndex), 'table' => sprintf('orders_%d', $tableIndex), 'shard_key' => $userId, ]; }
public function calculateShards(array $userIds): array { $shards = []; foreach ($userIds as $userId) { $shard = $this->calculateShard($userId); $key = "{$shard['db']}.{$shard['table']}"; $shards[$key]['db'] = $shard['db']; $shards[$key]['table'] = $shard['table']; $shards[$key]['user_ids'][] = $userId; } return array_values($shards); } }
|
3.2 分片键(Sharding Key)选择黄金法则
1 2 3 4 5 6 7 8 9 10 11 12 13
| CREATE TABLE orders ( order_id BIGINT PRIMARY KEY COMMENT '分布式ID', user_id BIGINT NOT NULL COMMENT '分片键:用户维度', shop_id BIGINT NOT NULL COMMENT '商户ID(通过异构索引支持查询)', create_time DATETIME NOT NULL, status TINYINT DEFAULT 0, amount DECIMAL(10,2), KEY idx_user_time (user_id, create_time), KEY idx_status (status) ) ENGINE=InnoDB COMMENT='订单主表';
|
选择标准:
- 查询频率:80%的查询应能通过分片键定位到单分片
- 数据均匀:避免按时间/地域等易倾斜字段单独分片
- 业务稳定:分片键值不应频繁变更(如用户手机号可能换绑)
- 关联友好:高频JOIN的表应使用相同分片键(绑定表设计)
3.3 平滑扩容方案:不停机迁移四步法
graph TB
subgraph "阶段1: 双写阶段 2-4周"
S1["应用层开启双写 新库为主,旧库为辅"]
S1_Sync["Binlog同步 Canal/Debezium"]
S1_Check["数据校验任务 每小时执行"]
end
subgraph "阶段2: 数据校验 1周"
S2["全量数据比对 按ID范围分页"]
S2_Fix["差异修复 自动/人工介入"]
S2_Confirm["一致性确认 差异率<0.01%"]
end
subgraph "阶段3: 灰度切读 1-2周"
S3["按user_id%100 逐步放量"]
S3_1["1%流量 新库"]
S3_10["10%流量 新库"]
S3_50["50%流量 新库"]
S3_100["100%流量 新库"]
S3_Monitor["监控核心指标 错误率/RT/负载"]
end
subgraph "阶段4: 停旧写新 1天"
S4["关闭双写 旧库只读"]
S4_Backup["旧库备份 保留7天"]
S4_Archive["历史数据归档 冷存储"]
end
S1 --> S1_Sync --> S1_Check --> S2
S2 --> S2_Fix --> S2_Confirm --> S3
S3 --> S3_1 --> S3_10 --> S3_50 --> S3_100
S3_100 --> S3_Monitor --> S4
S4 --> S4_Backup --> S4_Archive
style S1 fill:#fff4e1
style S2 fill:#e1f5ff
style S3 fill:#f0f9e8
style S4 fill:#fce4ecGo实现:双写迁移核心逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
type OrderMigrationService struct { oldRepo *OrderRepository newRepo *OrderRepository syncer *BinlogSyncer logger *zap.Logger }
func (s *OrderMigrationService) CreateOrder(ctx context.Context, order *Order) error { if err := s.newRepo.Insert(ctx, order); err != nil { return fmt.Errorf("write new shard failed: %w", err) } go func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := s.oldRepo.Insert(ctx, order); err != nil { s.logger.Error("async write old shard failed", zap.Int64("order_id", order.ID), zap.Error(err)) migrationQueue.Push(order) } }() return s.recordMigrationLog(order.ID, "NEW_PRIMARY") }
func (s *OrderMigrationService) ValidateData(startId, endId int64) (*ValidationResult, error) { var oldOrders, newOrders []*Order eg, ctx := errgroup.WithContext(context.Background()) eg.Go(func() error { var err error oldOrders, err = s.oldRepo.QueryRange(ctx, startId, endId) return err }) eg.Go(func() error { var err error newOrders, err = s.newRepo.QueryRange(ctx, startId, endId) return err }) if err := eg.Wait(); err != nil { return nil, err } oldMap := make(map[int64]*Order) for _, o := range oldOrders { oldMap[o.ID] = o } result := &ValidationResult{Total: len(newOrders)} for _, newOrd := range newOrders { if oldOrd, exists := oldMap[newOrd.ID]; !exists { result.MissingInOld++ } else if !ordersEqual(oldOrd, newOrd) { result.ContentDiff++ } } return result, nil }
|
PHP版本灰度切流配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| <?php
return [ 'orders' => [ 'shards' => [ 'database_count' => env('DB_SHARD_COUNT', 32), 'table_count' => env('TABLE_SHARD_COUNT', 32), ], 'gray_release' => [ 'enabled' => env('GRAY_ENABLED', false), 'percentage' => env('GRAY_PERCENT', 0), 'white_users' => explode(',', env('GRAY_WHITE_LIST', '')), ], 'datasources' => [ 'old' => [ 'driver' => 'mysql', 'host' => env('DB_OLD_HOST'), ], 'new' => [ 'driver' => 'mysql', 'host' => env('DB_NEW_HOST'), ], ], ], ];
class OrderService { public function getDataSource(int $userId): string { $config = config('sharding.orders'); if (in_array($userId, $config['gray_release']['white_users'])) { return 'new'; } if ($config['gray_release']['enabled']) { $hash = crc32($userId) % 100; if ($hash < $config['gray_release']['percentage']) { return 'new'; } } return 'old'; } }
|
四、分布式场景关键问题解决方案
4.1 查询路由架构
graph TB
subgraph "查询入口"
Request["HTTP/gRPC请求 user_id=123456"]
Middleware["中间件层 提取分片键"]
end
subgraph "路由决策"
Router["路由计算器 Router.calculateShard"]
Cache["路由缓存 Redis 5分钟"]
Decision{"是否跨分片?"}
end
subgraph "单分片查询"
SingleDB["定位单个数据库 ds_18"]
SingleTable["定位单个表 orders_7"]
DirectQuery["直接查询 高性能"]
end
subgraph "跨分片查询"
MultiDB["扫描多个数据库 ds_0 ~ ds_31"]
Parallel["并行查询 goroutine/协程"]
Merge["结果合并 内存排序/分页"]
end
subgraph "结果返回"
Response["JSON响应 订单列表"]
end
Request --> Middleware --> Router
Router --> Cache
Router --> Decision
Decision -->|单分片| SingleDB
Decision -->|多分片| MultiDB
SingleDB --> SingleTable --> DirectQuery
MultiDB --> Parallel --> Merge
DirectQuery --> Response
Merge --> Response
style Request fill:#ffe6e6
style DirectQuery fill:#e6ffe6
style Parallel fill:#fff4e6
style Response fill:#e6e6ff4.2 跨分片查询优化(Go并发实践)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
type QueryService struct { repos map[int]*OrderRepository }
func (qs *QueryService) QueryByUserIds(ctx context.Context, userIds []int64, startTime, endTime time.Time) ([]*Order, error) { shardGroups := make(map[int][]int64) for _, uid := range userIds { dbIdx, _ := shardingCfg.CalculateShard(uid) shardGroups[dbIdx] = append(shardGroups[dbIdx], uid) } type result struct { orders []*Order err error } resultChan := make(chan result, len(shardGroups)) for shardId, uids := range shardGroups { go func(sid int, uidList []int64) { orders, err := qs.repos[sid].QueryByUserIds(ctx, uidList, startTime, endTime) resultChan <- result{orders, err} }(shardId, uids) } var allOrders []*Order for i := 0; i < len(shardGroups); i++ { res := <-resultChan if res.err != nil { log.Error("shard query failed", zap.Error(res.err)) continue } allOrders = append(allOrders, res.orders...) } sort.Slice(allOrders, func(i, j int) bool { return allOrders[i].CreateTime.After(allOrders[j].CreateTime) }) return allOrders, nil }
|
4.3 分布式事务处理流程
sequenceDiagram
participant Client as 客户端
participant Order as 订单服务 OrderService
participant DB1 as 订单库 ds_18
participant MQ as 消息队列 RabbitMQ/Kafka
participant DB2 as 库存库 inventory_db
participant Job as 定时任务 MessageProcessor
Client->>Order: createOrder(userId, items)
activate Order
Order->>DB1: BEGIN TRANSACTION
activate DB1
Order->>DB1: "INSERT INTO orders (order_id, user_id, ...)"
Order->>DB1: "INSERT INTO order_items (item_id, order_id, ...)"
Order->>DB1: "INSERT INTO local_message (message_id, topic, payload, status=PENDING)"
Order->>DB1: COMMIT
deactivate DB1
Note over Order,DB1: "本地事务完成 订单已创建"
Order-->>Client: 返回order_id
deactivate Order
par 异步处理
Job->>DB1: "SELECT * FROM local_message WHERE status=PENDING LIMIT 100"
activate DB1
DB1-->>Job: 返回待处理消息
deactivate DB1
Job->>MQ: "Publish message topic: inventory.deduct"
activate MQ
MQ-->>Job: ACK
deactivate MQ
Job->>DB1: "UPDATE local_message SET status=SENT WHERE message_id=?"
activate DB1
DB1-->>Job: OK
deactivate DB1
and 库存消费
MQ->>DB2: "Consume message deduct inventory"
activate DB2
DB2->>DB2: "UPDATE inventory SET stock=stock-qty WHERE sku_id=?"
DB2-->>MQ: ACK
deactivate DB2
end
Note over Job,DB2: "最终一致性保证 即使库存扣减失败 消息会重试直到成功"4.4 分布式事务:本地消息表实现(PHP版)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| <?php
class CreateOrderHandler { public function __construct( private OrderRepository $orderRepo, private MessageRepository $messageRepo, private MQProducer $mqProducer, private InventoryService $inventoryService, ) {}
public function handle(CreateOrderCommand $cmd): Order { return DB::transaction(function() use ($cmd) { // 1. 创建订单(本地事务) $order = $this->orderRepo->create([ 'id' => $this->idGenerator->nextId(1, $cmd->userId), 'user_id' => $cmd->userId, 'amount' => $cmd->amount, 'status' => OrderStatus::PENDING, // ... 其他字段 ]); $this->messageRepo->create([ 'message_id' => Str::uuid()->toString(), 'business_key' => $order->id, 'topic' => 'inventory.deduct', 'payload' => json_encode([ 'order_id' => $order->id, 'sku_id' => $cmd->skuId, 'quantity' => $cmd->quantity, ], JSON_UNESCAPED_UNICODE), 'status' => MessageStatus::PENDING, 'next_retry' => now(), ]); return $order; }); }
public function processPendingMessages(): void { $messages = $this->messageRepo->getPending(100); foreach ($messages as $msg) { try { $this->mqProducer->send($msg->topic, $msg->payload); $this->messageRepo->markSent($msg->id); } catch (\Throwable $e) { $retryCount = $msg->retry_count + 1; $nextRetry = now()->addMinutes(min(2 ** $retryCount, 60)); if ($retryCount >= 5) { $this->messageRepo->markFailed($msg->id, $e->getMessage()); Alert::send("Message retry exhausted", ['message_id' => $msg->id]); } else { $this->messageRepo->updateRetry($msg->id, $retryCount, $nextRetry); } } } } }
|
4.5 监控体系架构
graph TB
subgraph "数据采集层"
SDK["应用SDK 埋点上报"]
Exporter["Exporter MySQL/Redis"]
Agent["Agent 服务器指标"]
end
subgraph "存储层"
Prometheus["Prometheus 时序数据库"]
ES["Elasticsearch 日志存储"]
end
subgraph "告警层"
AlertManager["AlertManager 告警管理"]
Rules["告警规则 PromQL"]
end
subgraph "可视化层"
Grafana["Grafana 监控看板"]
Dashboard["分片监控 业务监控"]
end
subgraph "通知渠道"
DingTalk["钉钉/企微"]
SMS["短信/电话"]
Email["邮件"]
end
SDK --> Prometheus
Exporter --> Prometheus
Agent --> Prometheus
Prometheus --> AlertManager
Rules --> AlertManager
Prometheus --> Grafana
Grafana --> Dashboard
AlertManager --> DingTalk
AlertManager --> SMS
AlertManager --> Email
style Prometheus fill:#e1f5ff
style AlertManager fill:#fff4e1
style Grafana fill:#f0f9e84.6 监控指标埋点(Go + Prometheus)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
var ( shardQPS = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "sharding_query_total", Help: "Total queries per shard", }, []string{"db_instance", "table_name", "shard_id", "status"}, ) crossShardRatio = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "sharding_cross_shard_ratio", Help: "Ratio of cross-shard queries", }, []string{"query_type"}, ) dataSkewness = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "sharding_data_skewness", Help: "Data distribution skewness (max/avg)", }, []string{"table_name"}, ) )
func WrapQuery(repo OrderRepository, dbName, tableName string, shardId int) OrderRepository { return &monitoredRepo{ repo: repo, dbName: dbName, tableName: tableName, shardId: shardId, } }
type monitoredRepo struct { repo OrderRepository dbName string tableName string shardId int }
func (m *monitoredRepo) Query(ctx context.Context, cond QueryCondition) ([]*Order, error) { start := time.Now() orders, err := m.repo.Query(ctx, cond) status := "success" if err != nil { status = "error" } shardQPS.WithLabelValues(m.dbName, m.tableName, strconv.Itoa(m.shardId), status).Inc() if cond.HasMultiShard() { crossShardRatio.WithLabelValues("user_query").Inc() } return orders, err }
|
五、最佳实践清单(生产可直接套用)
✅ 架构设计
✅ ID生成(语言无关要点)
✅ 数据迁移(关键步骤)
1 2 3 4 5 6 7
| 1. [ ] 新分片预创建: 32→64时提前建好新增的32个库表 2. [ ] 双写开关: 配置中心支持动态开启/关闭双写 3. [ ] 校验任务: 每小时执行数据一致性比对(按ID范围分页) 4. [ ] 灰度策略: user_id % 100 控制切流比例(1%→10%→50%→100%) 5. [ ] 回滚方案: 配置中心秒级切换数据源+旧库保留7天只读 6. [ ] 监控告警: 单分片QPS>2000、错误率>0.1%、数据倾斜>3.0触发告警
|
✅ 开发规范(PHP/Go通用)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <?php
function queryOrders(array $conditions): array { if (!isset($conditions['user_id'])) { if (app()->environment('production')) { Log::warning('Query without sharding key', ['conditions' => $conditions]); } else { throw new InvalidArgumentException("Missing sharding key: user_id"); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
type ContextKey string const ShardingKey ContextKey = "sharding:user_id"
func ShardingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { userId := parseUserIdFromRequest(r) ctx := context.WithValue(r.Context(), ShardingKey, userId) next.ServeHTTP(w, r.WithContext(ctx)) }) }
func (r *OrderRepo) Query(ctx context.Context, cond Condition) ([]*Order, error) { userId, ok := ctx.Value(ShardingKey).(int64) if !ok { return nil, errors.New("missing sharding context") } shard := router.CalculateShard(userId) db := r.getDataSource(shard.DB) }
|
✅ 运维保障
六、结语:分库分表不是银弹
分库分表是应对海量数据的最后手段,而非首选方案。在实施前请务必确认:
- ✅ 已完成索引优化、SQL调优、读写分离等基础优化
- ✅ 业务增长预测确实需要水平扩展(非短期峰值)
- ✅ 团队具备分布式系统运维和故障排查能力
- ✅ 有完善的监控、告警、回滚机制
🌟 核心原则:
“简单优于复杂,可运维性优于理论完美”
每一次分片都是对系统复杂度的增加,务必用业务价值驱动技术决策。
记住:好的架构不是设计出来的,而是在业务增长中演进和迭代出来的。除非确实有必要,否则尽力避免引入分布式系统!可优先考虑多租户、区域划分等策略,以业务实际情况和业务特点为重要划分依据。
七、附:分库分表完整设计设计案例
典型场景电商订单(Orders)系统分库分表生产级解决方案:
MySQL 8.0典型场景电商订单系统分库分表生产级解决方案
🔗 链接地址:https://www.wdft.com/c3cf6784.html
八、参考文献
延伸阅读 & 工具推荐:
之所以没有提到Java,主要是因为Java生态有成熟的中间件,比如ShardingSphere、MyCat等,而PHP和Go生态相对薄弱,目前没有特别成熟理想的解决方案,也不宜直接套用Java的解决方案。
记住:好的架构不是设计出来的,而是在业务增长中演进迭代提炼出来的,这里面是经济成本、数据增长、多方平衡的结果。