返回

PG Peering过程状态变化代码走读

最近看了PG状态转换的过程,代码细节没有仔细研究,先粗略过了一遍代码,特此记录.

PG

PG是存储池的基本单元,是一些对象的集合,多副本和纠删的数据备份策略依托PG实现. PG有多种状态,状态之间的变化通过状态机实现.
有两个场景会触发peering流程:

  • 在pg创建时
  • 在OSD启动、停止导致OSDMap变化进而导致pg的acting set发生变化时

状态机

状态机在创建PG进行初始化.

class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
    RecoveryState *state;
  public:
    PG *pg;

    utime_t event_time;
    uint64_t event_count;

boost::statechat包含对象:

  • state_machine: 状态机
  • state: 状态
  • event:事件 可通过process_event函数进行事件投递
  • transition / custom_reaction: 转移/反应 custom_reaction通过对于react函数进行处理 PG状态机的对象、状态及时间处理主要在PG.h、PG.cc文件中.
struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState {
  explicit Initial(my_context ctx);
  void exit();

 typedef boost::mpl::list <
 boost::statechart::transition< Initialize, Reset >,
 boost::statechart::custom_reaction< Load >,
 boost::statechart::custom_reaction< NullEvt >,
 boost::statechart::transition< boost::statechart::event_base, Crashed >
 > reactions;

     boost::statechart::result react(const Load&);
     boost::statechart::result react(const MNotifyRec&);
     boost::statechart::result react(const MInfoRec&);
     boost::statechart::result react(const MLogRec&);
     boost::statechart::result react(const boost::statechart::event_base&) {
return discard_event();
     }
   };

PG状态机主要包含的状态如下图:
PG状态机

Primary Peering

Primary Peering过程中状态机流程图如下图所示,并没有展示从OSD和Recovery、Backfill过程.
PG部分状态变化 主OSD接受到pg_create消息开始创建PG

void OSD::handle_pg_create(OpRequestRef op)
{
  MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
  assert(m->get_type() == MSG_OSD_PG_CREATE);

  dout(10) << "handle_pg_create " << *m << dendl;

...

依次调用handle_pg_create-> handle_pg_peering_evt -> _create_lock_pg -> _open_lock_pg -> _make_pg,创建PG.

PG* OSD::_make_pg(
  OSDMapRef createmap,
  spg_t pgid)
{
  dout(10) << "_open_lock_pg " << pgid << dendl;
  PGPool pool = _get_pool(pgid.pool(), createmap);

  // create
  PG *pg;
  if (createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_REPLICATED ||
      createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_ERASURE)
    pg = new ReplicatedPG(&service, createmap, pool, pgid);
  else
    assert(0);

  return pg;
}

创建PG完成之后,在handle_pg_create -> handle_pg_peering_evt -> handle_create 函数中,开始进行事件投递和事件的处理.

void PG::handle_create(RecoveryCtx *rctx)
{
  dout(10) << "handle_create" << dendl;
  rctx->created_pgs.insert(this);
  Initialize evt;
  recovery_state.handle_event(evt, rctx);
  ActMap evt2;
  recovery_state.handle_event(evt2, rctx);
}

在创建之初,状态机处于Initial状态,在此状态下创建一个Initialize事件,状态机的handle_event函数会调用process_event函数将事件投递出去.

可以看到在Initial`状态下接受到Initialize事件之后,状态机直接转换为Reset状态.先调用Reset构造函数,然后带着状态重新回到handle_create函数处,继续创建ActMap事件然后投递出去.
Reset状态下接收到ActMap事件后在对应定义的react函数中进行处理.

boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
{
  PG *pg = context< RecoveryMachine >().pg;
  if (pg->should_send_notify() && pg->get_primary().osd >= 0) {
    context< RecoveryMachine >().send_notify(
      pg->get_primary(),
      pg_notify_t(
 pg->get_primary().shard, pg->pg_whoami.shard,
 pg->get_osdmap()->get_epoch(),
 pg->get_osdmap()->get_epoch(),
 pg->info),
      pg->past_intervals);
  }

  pg->update_heartbeat_peers();
  pg->take_waiters();

  return transit< Started >();
}

在react函数中,状态直接转换到Started状态, Started状态定义了子状态,则直接跳转到Start子状态.

struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState {
  explicit Started(my_context ctx);
  void exit();

  typedef boost::mpl::list <
 boost::statechart::custom_reaction< QueryState >,
 boost::statechart::custom_reaction< AdvMap >,
 boost::statechart::custom_reaction< NullEvt >,
 boost::statechart::custom_reaction< FlushedEvt >,
 boost::statechart::custom_reaction< IntervalFlush >,
 boost::statechart::transition< boost::statechart::event_base, Crashed >
 > reactions;
      boost::statechart::result react(const QueryState& q);
      boost::statechart::result react(const AdvMap&);
      boost::statechart::result react(const FlushedEvt&);
      boost::statechart::result react(const IntervalFlush&);
      boost::statechart::result react(const boost::statechart::event_base&) {
 return discard_event();
  }
};

查看Start状态的构造函数

/*-------Start---------*/
PG::RecoveryState::Start::Start(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Start")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  if (pg->is_primary()) {
    dout(1) << "transitioning to Primary" << dendl;
    post_event(MakePrimary());
  } else { //is_stray
    dout(1) << "transitioning to Stray" << dendl;
    post_event(MakeStray());
  }
}

首先获取当前处理的PG,判断当前osd是不是该pg的主osd,如果是的话则投递MakePrimary事件,否则投递MakeStray事件,如果进入Stray状态则对应PG实例需要由当前Primary按照Peering的进度和结果进一步确认其身份.
按照主OSD的路径继续走, 在Start状态下接受到MakePrimary事件后,状态机会依次进入到Started/Primary/Peering/Getinfo,进入peering阶段。
GetInfo过程获取该PG在其他OSD上的pg_info_t信息。
调用函数generate_past_intervals计算past intervals的值. 调用函数build_prior构造获取pg_info_t信息的OSD列表
调用get_infos给参与的OSD发送获取请求

/*--------GetInfo---------*/
PG::RecoveryState::GetInfo::GetInfo(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetInfo")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  pg->generate_past_intervals();
  unique_ptr<PriorSet> &prior_set = context< Peering >().prior_set;

  assert(pg->blocked_by.empty());

  if (!prior_set.get())
    pg->build_prior(prior_set);

  pg->reset_min_peer_features();
  get_infos();
  if (peer_info_requested.empty() && !prior_set->pg_down) {
    post_event(GotInfo());
  }
}

主OSD收到pg_info的ACK信息后封装成MNotifyRec事件发送给状态机.
在对应react函数中对拉取的pg_info进行处理,在GetInfo状态下如果所有的副本OSD都成功将信息返回,则会投递GotInfo事件.
状态机收到GotInfo事件后,跳转到GetLog状态,在GetLog的构造函数中

/*------GetLog------------*/
PG::RecoveryState::GetLog::GetLog(my_context ctx)
  : my_base(ctx),
    NamedState(
      context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetLog"),
    msg(0)
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;

  // adjust acting?
  if (!pg->choose_acting(auth_log_shard, false,
    &context< Peering >().history_les_bound)) {
    if (!pg->want_acting.empty()) {
      post_event(NeedActingChange());
    } else {
      post_event(IsIncomplete());
    }
    return;
  }
...

调用choose_acting函数选出具有权威日志的OSD并计算wan_acting列表
如果自己就是权威日志,则不需要拉取,直接投递GotLog事件进入下一状态
如果自己不是权威日志,则需要去具有权威日志的OSD上去拉取,并与本地日志合并.通过发送pg_query_t::LOG事件到具有权威日志的OSD进行拉取,当收到权威日志后,封装成MLogRec类型事件

boost::statechart::result PG::RecoveryState::GetLog::react(const MLogRec& logevt)
{
  assert(!msg);
  if (logevt.from != auth_log_shard) {
    dout(10) << "GetLog: discarding log from "
      << "non-auth_log_shard osd." << logevt.from << dendl;
    return discard_event();
  }
  dout(10) << "GetLog: received master log from osd"
    << logevt.from << dendl;
  msg = logevt.msg;
  post_event(GotLog());
  return discard_event();
}

投递GotLog事件后,转移到GetMissing状态,进入GetMissing构造函数

/*------GetMissing--------*/
PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Peering/GetMissing")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  assert(!pg->actingbackfill.empty());
  for (set<pg_shard_t>::iterator i = pg->actingbackfill.begin();
       i != pg->actingbackfill.end();
       ++i) {
    if (*i == pg->get_primary()) continue;
    const pg_info_t& pi = pg->peer_info[*i];

    if (pi.is_empty())
      continue;                                // no pg data, nothing divergent

    if (pi.last_update < pg->pg_log.get_tail()) {
      dout(10) << " osd." << *i << " is not contiguous, will restart backfill" << dendl;
      pg->peer_missing[*i];
      continue;
    }
    if (pi.last_backfill == hobject_t()) {
      dout(10) << " osd." << *i << " will fully backfill; can infer empty missing set" << dendl;
      pg->peer_missing[*i];
      continue;
    }
...

GetMissing处理过程中首先拉取各个从OSD的有效日志,然后用主OSD的权威日止与各个从OSD的日志进行对比,从而计算出各个从OSD上不一致的对象并保存在对应的pg_missing_t结构体中,作为后续数据修复依据.
如果所有获取日志的请求都返回并处理完成,则调用Activate(pg->get_osdmap()->get_epoch()) 进入Active状态

// all good!
post_event(Activate(pg->get_osdmap()->get_epoch()));

到本阶段则可以说Peering主要工作已经完成,不过在接受客户端读写之前还需要执行active操作激活各个副本,该操作主要目的为固化本地peering成果,以保证其不致因为系统掉电而前功尽弃,同时还需要初始化后续在后台执行Recovery或者Backfill所依赖的关键元数据信息.

  pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
        pg->get_osdmap()->get_epoch(),
        *context< RecoveryMachine >().get_on_safe_context_list(),
        *context< RecoveryMachine >().get_query_map(),
        context< RecoveryMachine >().get_info_map(),
        context< RecoveryMachine >().get_recovery_ctx());

如果所有副本都被激活则投递AllReplicasActivated事件,在Active状态下处理该事件,处理过程中调用pg->on_activate()函数

在该函数中查看是否需要Recovery需要则触发DoRecovery事件;查看是否需要Backfill操作,需要则触发RequestBackfill操作.


void ReplicatedPG::on_activate()
{
  // all clean?
  if (needs_recovery()) {
    dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
    queue_peering_event(
      CephPeeringEvtRef(
 std::make_shared<CephPeeringEvt>(
   get_osdmap()->get_epoch(),
   get_osdmap()->get_epoch(),
   DoRecovery())));
  } else if (needs_backfill()) {
    dout(10) << "activate queueing backfill" << dendl;
    queue_peering_event(
      CephPeeringEvtRef(
 std::make_shared<CephPeeringEvt>(
   get_osdmap()->get_epoch(),
   get_osdmap()->get_epoch(),
   RequestBackfill())));
  } else {
    dout(10) << "activate all replicas clean, no recovery" << dendl;
    queue_peering_event(
      CephPeeringEvtRef(
 std::make_shared<CephPeeringEvt>(
   get_osdmap()->get_epoch(),
   get_osdmap()->get_epoch(),
   AllReplicasRecovered())));
  }
...

Recovery

如果Primary检测自身或者任意一个Peer存在待修复的对象,将通过向状态机投递DoRecovery事件,切换到Started/Primary/Active/WaitLocalRecoveryReserved状态,开始准备执行Recovery.
Recovery 是仅依据PG日志中的缺失记录来修复不一致的对象.
在Activating状态接受到DoRecovery事件后,转换到WaitLocalRecoveryReserved状态.

struct Activating : boost::statechart::state< Activating, Active >, NamedState {
  typedef boost::mpl::list <
 boost::statechart::transition< AllReplicasRecovered, Recovered >,
 boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >,
 boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved >
 > reactions;
      explicit Activating(my_context ctx);
      void exit();
  };
...

在WaitLocalRecoveryReserved构造函数中,通过request_reservation函数进行资源预留请求,资源预留是为了控制一个OSD上正在修复的PG最大数目,在主OSD和从OSD上都需要预约。当收到LocalRecoveryReserved事件后,标志本地资源预约完成.

PG::RecoveryState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Active/WaitLocalRecoveryReserved")
{
  context< RecoveryMachine >().log_enter(state_name);
  PG *pg = context< RecoveryMachine >().pg;
  pg->state_set(PG_STATE_RECOVERY_WAIT);
  pg->osd->local_reserver.request_reservation(
    pg->info.pgid,
    new QueuePeeringEvt<LocalRecoveryReserved>(
      pg, pg->get_osdmap()->get_epoch(),
      LocalRecoveryReserved()),
    pg->get_recovery_priority());
  pg->publish_stats_to_osd();
}

当收到LocalRecoveryReserved事件后,标志本地资源预约完成,然后转移到WaitRemoteRecoveryReserved状态

struct WaitLocalRecoveryReserved : boost::statechart::state< WaitLocalRecoveryReserved, Active >, NamedState {
  typedef boost::mpl::list <
 boost::statechart::transition< LocalRecoveryReserved, WaitRemoteRecoveryReserved >
 > reactions;
      explicit WaitLocalRecoveryReserved(my_context ctx);
      void exit();
};

在WaitRemoteRecoveryReserved状态下,完成远程资源的预约,当接收到RemoteRecoveryReserved后表明资源都预约完成,然后投递AllRemotesReserved事件,标志着该PG在所有参与数据修复的从OSD上完成资源预约,进入Recovery状态.

PG::RecoveryState::Recovering::Recovering(my_context ctx)
  : my_base(ctx),
    NamedState(context< RecoveryMachine >().pg->cct, "Started/Primary/Active/Recovering")
{
  context< RecoveryMachine >().log_enter(state_name);

  PG *pg = context< RecoveryMachine >().pg;
  pg->state_clear(PG_STATE_RECOVERY_WAIT);
  pg->state_set(PG_STATE_RECOVERING);
  pg->publish_stats_to_osd();
  pg->osd->queue_for_recovery(pg);
}

在Recoverying状态完成实际的数据修复工作
把PG状态设置为PG_STATE_RECOVERING,并把PG添加到recovery_wq工作队列中,开始启动数据修复.
Recovery过程由PG的主OSD来触发并控制,先修复主的,然后修复从的。
在recovery_wq中,工作队列的线程池的处理函数调用do_recovery函数执行实际的数据修复工作.

void _process(PG *pg, ThreadPool::TPHandle &handle) override {
  osd->do_recovery(pg, handle);
  pg->put("RecoveryWQ");
}

ReplicatedPG::start_recovery_ops函数完成副本类型PG的修复工作.

bool ReplicatedPG::start_recovery_ops(
  int max, ThreadPool::TPHandle &handle,
  int *ops_started)
{
  int& started = *ops_started;
  started = 0;
  bool work_in_progress = false;
  ...
  started = recover_replicas(max, handle);
  }
  if (!started) {
    // We still have missing objects that we should grab from replicas.
    started += recover_primary(max, handle);
  }
  ...

函数ReplicatedPG::recover_primary完成PG主OSD缺失对象的修复
如果在Recoverying状态完成Recovery操作后,如果需要Backfill工作则接受RequestBackfill事件,进入Backfill流程
如果没有Backfill工作,直接接受AllReplicasRecovered事件,转入Recovered状态转入Recovered状态之后,意味着完成数据修复工作,当收到GoClean事件后,PG进入clean状态.

Backfill

和Recovery类似,如果Primary发现还有副本需要通过Backfill才能修复,则进行Backfill。
Backfill是PG通过重新扫描所有的对象,对比发现确实的对象,通过整体拷贝来修复。和Recovery一样需要进行资源预约.
这部分过程就不写了…