PG Peering过程状态变化代码走读
最近看了PG状态转换的过程,代码细节没有仔细研究,先粗略过了一遍代码,特此记录.
PG
PG是存储池的基本单元,是一些对象的集合,多副本和纠删的数据备份策略依托PG实现. PG有多种状态,状态之间的变化通过状态机实现.
有两个场景会触发peering流程:
- 在pg创建时
- 在OSD启动、停止导致OSDMap变化进而导致pg的acting set发生变化时
状态机
状态机在创建PG进行初始化.
class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
RecoveryState *state;
public:
PG *pg;
utime_t event_time;
uint64_t event_count;
boost::statechat包含对象:
- state_machine: 状态机
- state: 状态
- event:事件 可通过process_event函数进行事件投递
- transition / custom_reaction: 转移/反应 custom_reaction通过对于react函数进行处理 PG状态机的对象、状态及时间处理主要在PG.h、PG.cc文件中.
struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
explicit Initial(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::transition< Initialize, Reset >,
boost::statechart::custom_reaction< Load >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const Load&);
boost::statechart::result react(const MNotifyRec&);
boost::statechart::result react(const MInfoRec&);
boost::statechart::result react(const MLogRec&);
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
PG状态机主要包含的状态如下图:
Primary Peering
Primary Peering过程中状态机流程图如下图所示,并没有展示从OSD和Recovery、Backfill过程.
主OSD接受到pg_create消息开始创建PG
void OSD::handle_pg_create(OpRequestRef op)
{
MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
assert(m->get_type() == MSG_OSD_PG_CREATE);
dout(10) << "handle_pg_create " << *m << dendl;
...
依次调用handle_pg_create-> handle_pg_peering_evt -> _create_lock_pg -> _open_lock_pg -> _make_pg,创建PG.
PG* OSD::_make_pg(
OSDMapRef createmap,
spg_t pgid)
{
dout(10) << "_open_lock_pg " << pgid << dendl;
PGPool pool = _get_pool(pgid.pool(), createmap);
// create
PG *pg;
if (createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_REPLICATED ||
createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_ERASURE)
pg = new ReplicatedPG(&service, createmap, pool, pgid);
else
assert(0);
return pg;
}
创建PG完成之后,在handle_pg_create -> handle_pg_peering_evt -> handle_create 函数中,开始进行事件投递和事件的处理.
void PG::handle_create(RecoveryCtx *rctx)
{
dout(10) << "handle_create" << dendl;
rctx->created_pgs.insert(this);
Initialize evt;
recovery_state.handle_event(evt, rctx);
ActMap evt2;
recovery_state.handle_event(evt2, rctx);
}
在创建之初,状态机处于Initial状态,在此状态下创建一个Initialize事件,状态机的handle_event函数会调用process_event函数将事件投递出去.
可以看到在Initial`状态下接受到Initialize事件之后,状态机直接转换为Reset状态.先调用Reset构造函数,然后带着状态重新回到handle_create函数处,继续创建ActMap事件然后投递出去.
Reset状态下接收到ActMap事件后在对应定义的react函数中进行处理.
boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
{
PG *pg = context< RecoveryMachine >().pg;
if (pg->should_send_notify() && pg->get_primary().osd >= 0) {
context< RecoveryMachine >().send_notify(
pg->get_primary(),
pg_notify_t(
pg->get_primary().shard, pg->pg_whoami.shard,
pg->get_osdmap()->get_epoch(),
pg->get_osdmap()->get_epoch(),
pg->info),
pg->past_intervals);
}
pg->update_heartbeat_peers();
pg->take_waiters();
return transit< Started >();
}
在react函数中,状态直接转换到Started状态, Started状态定义了子状态,则直接跳转到Start子状态.
struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
explicit Started(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< QueryState >,
boost::statechart::custom_reaction< AdvMap >,
boost::statechart::custom_reaction< NullEvt >,
boost::statechart::custom_reaction< FlushedEvt >,
boost::statechart::custom_reaction< IntervalFlush >,
boost::statechart::transition< boost::statechart::event_base, Crashed >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const AdvMap&);
boost::statechart::result react(const FlushedEvt&);
boost::statechart::result react(const IntervalFlush&);
boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
}
};
查看Start状态的构造函数
/*-------Start---------*/
PG::RecoveryState::Start::Start(my_context ctx)
: my_base(ctx),
NamedState(context< RecoveryMachine >().pg->cct, "Start")
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
if (pg->is_primary()) {
dout(1) << "transitioning to Primary" << dendl;
post_event(MakePrimary());
} else { //is_stray
dout(1) << "transitioning to Stray" << dendl;
post_event(MakeStray());
}
}
首先获取当前处理的PG,判断当前osd是不是该pg的主osd,如果是的话则投递MakePrimary事件,否则投递MakeStray事件,如果进入Stray状态则对应PG实例需要由当前Primary按照Peering的进度和结果进一步确认其身份.
按照主OSD的路径继续走, 在Start状态下接受到MakePrimary事件后,状态机会依次进入到Started/Primary/Peering/Getinfo,进入peering阶段。
GetInfo过程获取该PG在其他OSD上的pg_info_t信息。
调用函数generate_past_intervals计算past intervals的值.
调用函数build_prior构造获取pg_info_t信息的OSD列表
调用get_infos给参与的OSD发送获取请求
/*--------GetInfo---------*/
PG::RecoveryState::GetInfo::GetInfo(my_context ctx)
: my_base(ctx),
NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetInfo")
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->generate_past_intervals();
unique_ptr<PriorSet> &prior_set = context< Peering >().prior_set;
assert(pg->blocked_by.empty());
if (!prior_set.get())
pg->build_prior(prior_set);
pg->reset_min_peer_features();
get_infos();
if (peer_info_requested.empty() && !prior_set->pg_down) {
post_event(GotInfo());
}
}
主OSD收到pg_info的ACK信息后封装成MNotifyRec事件发送给状态机.
在对应react函数中对拉取的pg_info进行处理,在GetInfo状态下如果所有的副本OSD都成功将信息返回,则会投递GotInfo事件.
状态机收到GotInfo事件后,跳转到GetLog状态,在GetLog的构造函数中
/*------GetLog------------*/
PG::RecoveryState::GetLog::GetLog(my_context ctx)
: my_base(ctx),
NamedState(
context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetLog"),
msg(0)
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
// adjust acting?
if (!pg->choose_acting(auth_log_shard, false,
&context< Peering >().history_les_bound)) {
if (!pg->want_acting.empty()) {
post_event(NeedActingChange());
} else {
post_event(IsIncomplete());
}
return;
}
...
调用choose_acting函数选出具有权威日志的OSD并计算wan_acting列表
如果自己就是权威日志,则不需要拉取,直接投递GotLog事件进入下一状态
如果自己不是权威日志,则需要去具有权威日志的OSD上去拉取,并与本地日志合并.通过发送pg_query_t::LOG事件到具有权威日志的OSD进行拉取,当收到权威日志后,封装成MLogRec类型事件
boost::statechart::result PG::RecoveryState::GetLog::react(const MLogRec& logevt)
{
assert(!msg);
if (logevt.from != auth_log_shard) {
dout(10) << "GetLog: discarding log from "
<< "non-auth_log_shard osd." << logevt.from << dendl;
return discard_event();
}
dout(10) << "GetLog: received master log from osd"
<< logevt.from << dendl;
msg = logevt.msg;
post_event(GotLog());
return discard_event();
}
投递GotLog事件后,转移到GetMissing状态,进入GetMissing构造函数
/*------GetMissing--------*/
PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
: my_base(ctx),
NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetMissing")
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
assert(!pg->actingbackfill.empty());
for (set<pg_shard_t>::iterator i = pg->actingbackfill.begin();
i != pg->actingbackfill.end();
++i) {
if (*i == pg->get_primary()) continue;
const pg_info_t& pi = pg->peer_info[*i];
if (pi.is_empty())
continue; // no pg data, nothing divergent
if (pi.last_update < pg->pg_log.get_tail()) {
dout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl;
pg->peer_missing[*i];
continue;
}
if (pi.last_backfill == hobject_t()) {
dout(10) << " osd." << *i << " will fully backfill; can infer empty missing set" << dendl;
pg->peer_missing[*i];
continue;
}
...
GetMissing处理过程中首先拉取各个从OSD的有效日志,然后用主OSD的权威日止与各个从OSD的日志进行对比,从而计算出各个从OSD上不一致的对象并保存在对应的pg_missing_t结构体中,作为后续数据修复依据.
如果所有获取日志的请求都返回并处理完成,则调用Activate(pg->get_osdmap()->get_epoch()) 进入Active状态
// all good!
post_event(Activate(pg->get_osdmap()->get_epoch()));
到本阶段则可以说Peering主要工作已经完成,不过在接受客户端读写之前还需要执行active操作激活各个副本,该操作主要目的为固化本地peering成果,以保证其不致因为系统掉电而前功尽弃,同时还需要初始化后续在后台执行Recovery或者Backfill所依赖的关键元数据信息.
pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
pg->get_osdmap()->get_epoch(),
*context< RecoveryMachine >().get_on_safe_context_list(),
*context< RecoveryMachine >().get_query_map(),
context< RecoveryMachine >().get_info_map(),
context< RecoveryMachine >().get_recovery_ctx());
如果所有副本都被激活则投递AllReplicasActivated事件,在Active状态下处理该事件,处理过程中调用pg->on_activate()函数
在该函数中查看是否需要Recovery需要则触发DoRecovery事件;查看是否需要Backfill操作,需要则触发RequestBackfill操作.
void ReplicatedPG::on_activate()
{
// all clean?
if (needs_recovery()) {
dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
get_osdmap()->get_epoch(),
get_osdmap()->get_epoch(),
DoRecovery())));
} else if (needs_backfill()) {
dout(10) << "activate queueing backfill" << dendl;
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
get_osdmap()->get_epoch(),
get_osdmap()->get_epoch(),
RequestBackfill())));
} else {
dout(10) << "activate all replicas clean, no recovery" << dendl;
queue_peering_event(
CephPeeringEvtRef(
std::make_shared<CephPeeringEvt>(
get_osdmap()->get_epoch(),
get_osdmap()->get_epoch(),
AllReplicasRecovered())));
}
...
Recovery
如果Primary检测自身或者任意一个Peer存在待修复的对象,将通过向状态机投递DoRecovery事件,切换到Started/Primary/Active/WaitLocalRecoveryReserved状态,开始准备执行Recovery.
Recovery 是仅依据PG日志中的缺失记录来修复不一致的对象.
在Activating状态接受到DoRecovery事件后,转换到WaitLocalRecoveryReserved状态.
struct Activating : boost::statechart::state< Activating, Active >, NamedState {
typedef boost::mpl::list <
boost::statechart::transition< AllReplicasRecovered, Recovered >,
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >,
boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved >
> reactions;
explicit Activating(my_context ctx);
void exit();
};
...
在WaitLocalRecoveryReserved构造函数中,通过request_reservation函数进行资源预留请求,资源预留是为了控制一个OSD上正在修复的PG最大数目,在主OSD和从OSD上都需要预约。当收到LocalRecoveryReserved事件后,标志本地资源预约完成.
PG::RecoveryState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ctx)
: my_base(ctx),
NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Active/WaitLocalRecoveryReserved")
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->state_set(PG_STATE_RECOVERY_WAIT);
pg->osd->local_reserver.request_reservation(
pg->info.pgid,
new QueuePeeringEvt<LocalRecoveryReserved>(
pg, pg->get_osdmap()->get_epoch(),
LocalRecoveryReserved()),
pg->get_recovery_priority());
pg->publish_stats_to_osd();
}
当收到LocalRecoveryReserved事件后,标志本地资源预约完成,然后转移到WaitRemoteRecoveryReserved状态
struct WaitLocalRecoveryReserved : boost::statechart::state< WaitLocalRecoveryReserved, Active >, NamedState {
typedef boost::mpl::list <
boost::statechart::transition< LocalRecoveryReserved, WaitRemoteRecoveryReserved >
> reactions;
explicit WaitLocalRecoveryReserved(my_context ctx);
void exit();
};
在WaitRemoteRecoveryReserved状态下,完成远程资源的预约,当接收到RemoteRecoveryReserved后表明资源都预约完成,然后投递AllRemotesReserved事件,标志着该PG在所有参与数据修复的从OSD上完成资源预约,进入Recovery状态.
PG::RecoveryState::Recovering::Recovering(my_context ctx)
: my_base(ctx),
NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Active/Recovering")
{
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->state_clear(PG_STATE_RECOVERY_WAIT);
pg->state_set(PG_STATE_RECOVERING);
pg->publish_stats_to_osd();
pg->osd->queue_for_recovery(pg);
}
在Recoverying状态完成实际的数据修复工作
把PG状态设置为PG_STATE_RECOVERING,并把PG添加到recovery_wq工作队列中,开始启动数据修复.
Recovery过程由PG的主OSD来触发并控制,先修复主的,然后修复从的。
在recovery_wq中,工作队列的线程池的处理函数调用do_recovery函数执行实际的数据修复工作.
void _process(PG *pg, ThreadPool::TPHandle &handle) override {
osd->do_recovery(pg, handle);
pg->put("RecoveryWQ");
}
ReplicatedPG::start_recovery_ops函数完成副本类型PG的修复工作.
bool ReplicatedPG::start_recovery_ops(
int max, ThreadPool::TPHandle &handle,
int *ops_started)
{
int& started = *ops_started;
started = 0;
bool work_in_progress = false;
...
started = recover_replicas(max, handle);
}
if (!started) {
// We still have missing objects that we should grab from replicas.
started += recover_primary(max, handle);
}
...
函数ReplicatedPG::recover_primary完成PG主OSD缺失对象的修复
如果在Recoverying状态完成Recovery操作后,如果需要Backfill工作则接受RequestBackfill事件,进入Backfill流程
如果没有Backfill工作,直接接受AllReplicasRecovered事件,转入Recovered状态转入Recovered状态之后,意味着完成数据修复工作,当收到GoClean事件后,PG进入clean状态.
Backfill
和Recovery类似,如果Primary发现还有副本需要通过Backfill才能修复,则进行Backfill。
Backfill是PG通过重新扫描所有的对象,对比发现确实的对象,通过整体拷贝来修复。和Recovery一样需要进行资源预约.
这部分过程就不写了…