原文: https://mp.weixin.qq.com/s/CfBrYgchR60G8N5I0USY7w

SPDK Thread 模型设计与实现

Reactor

Reactor – 单个CPU Core抽象,主要包含了:

对象g_reactor_state有五个状态对应了应用中reactors运行运行状态

enum spdk_reactor_state {
    SPDK_REACTOR_STATE_INVALID = 0,
    SPDK_REACTOR_STATE_INITIALIZED = 1,
    SPDK_REACTOR_STATE_RUNNING = 2,
    SPDK_REACTOR_STATE_EXITING = 3,
    SPDK_REACTOR_STATE_SHUTDOWN = 4,
};

初始情况下是:

SPDK_REACTOR_STATE_INVALID状态,在spdk app(任意一个target,比如nvmf_tgt)启动时,即调用了spdk_app_start方法,会调用spdk_reactors_init,在这个方法中将会初始化所有需要被初始化的reactors(可以在配置文件中指定需要使用的Core,CPU Core 和reactor是一对一的)。并且会将g_reactor_state设置为SPDK_REACTOR_STATE_INITIALIZED。具体代码如下:

int spdk_reactors_init(void)
{
    // 初始化所有的event mempool
    g_spdk_event_mempool = spdk_mempool_create(…);
    // 为g_reactors分配内存,g_reactors是一个数组,管理了所有的reactors
    posix_memalign((void **)&g_reactors, 64,  (last_core + 1) * sizeof(struct spdk_reactor));
    // 这里设置了reactor创建线程的方法,之后需要初始化线程的时候将会调用该方法
    spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread));
    // 对于每一个启动的reactor,将会初始化它们
    // 初始化reactor过程,即为绑定lcore,初始化spdk ring、threads,对rusage无操作
    SPDK_ENV_FOREACH_CORE(i) {
        reactor = spdk_reactor_get(i);
        spdk_reactor_construct(reactor, i);
    }
    // 设置好状态返回
    g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
    return 0;
}

在进入SPDK_REACTOR_STATE_INITIALIZED状态且spdk_app_start在创建了自己的线程并绑定到了reactors后,会调用spdk_reactors_start方法并将g_reactor_state设置为SPDK_REACTOR_STATE_RUNNING状态并会创建所有reactor的线程且轮询

void spdk_reactors_start(void) {
    SPDK_ENV_FOREACH_CORE(i) {
        if (i != current_core) { // 在非master reactor中
            reactor = spdk_reactor_get(i); // 得到相应的reactor
            // 设置好线程创建后的一个消息,该消息为轮询函数
            rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor);                             
            // reactor创建好线程并且会自动执行第一个消息
            spdk_thread_create(thread_name, tmp_cpumask);

        }
    }
    // 当前CPU core得到reactor,并且开始轮询
    reactor = spdk_reactor_get(current_core);
    _spdk_reactor_run(reactor);
}

之前提到spdk_reactors_init方法中调用了spdk_thread_lib_init方法传入了创建thread的spdk_reactor_schedule_thread方法,在调用spdk_thread_create会回调该方法。这个方法它主要的功能就是告诉这个新创建的线程绑定创建该线程的reactor。

在SPDK_REACTOR_STATE_RUNNING后,此时所有reactor就进入了轮询状态。_spdk_reactor_run函数为线程提供了轮询方法:

static int _spdk_reactor_run(void * arg) {
  while (1) {
    // 处理reactor上的event消息,消息会在之后讲到
    _spdk_event_queue_run_batch(reactor);
    // 每一个reactor上注册的thread进行遍历并且处理poller事件
    TAILQ_FOREACH_SAFE(lw_thread, & reactor -> threads, link, tmp) {
      rc = spdk_thread_poll(thread, 0, now);
    }
    // 检查reactor的状态
    if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
      break;
    }
  }
}

而当spdk app被调用spdk_app_stop方法后将会相应的通知每一个reactor调用spdk_reactors_stop方法,将g_reactor_state赋值为SPDK_REACTOR_STATE_EXITING,即开始退出了。回到_spdk_reactor_run函数中,轮询将会被跳出,并且执行销毁线程的代码。

在这之后,主线程的_spdk_reactor_run会返回到spdk_reactors_start中,并将g_reactor_state赋值为SPDK_REACTOR_STATE_SHUTDOWN,返回到spdk_app_start中等待应用退出。

reactors和CPU core以及spdk thread关系应该如图所示:

image.png

image.png

Thread

Thread – 线程,但它是spdk抽象出来的线程,主要包含了:

spdk_thread 不是常规意义下的线程,实际是个逻辑上的概念,它没有具体的执行函数,其所有相关的操作均在 reactor 的执行函数中来执行

有了 spdk_thread 后就可以通过注册 spdk_poller 来重复或者周期性的运行某个函数。如果注册 poller 时的周期指定为 0,那么 poller 对应的执行函数就会在每个 reactor 的循环中均进行调用;如果周期不为 0,那各次 reactor 的循环中就会检查是否满足执行的周期时才执行

当Reactors进行轮询时,除了处理自己的事件消息之外,还会调用注册在该reactor下面的每一个线程进行轮询。不过通常一个reactor只有一个thread,在spdk应用中,更多的是注册多个poller而不是注册多个thread。具体的轮询方法为:

Int spdk_thread_poll(struct spdk_thread * thread, uint32_t max_msgs, uint64_t now) {
  // 首先先处理ring传递过来的消息
  msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
  // 调用非定时poller中的方法
  TAILQ_FOREACH_REVERSE_SAFE(poller, & thread -> active_pollers,
    active_pollers_head, tailq, tmp) {
    // 调用poller注册的方法之前,会对poller状态检测且转换
    if (poller -> state == SPDK_POLLER_STATE_UNREGISTERED) {
      TAILQ_REMOVE( & thread -> active_pollers, poller, tailq);
      free(poller);
      continue;
    }
    poller -> state = SPDK_POLLER_STATE_RUNNING;
    // 调用poller注册的方法
    poller_rc = poller -> fn(poller -> arg);
    // poller转换状态
    poller -> state = SPDK_POLLER_STATE_WAITING;
  }
  // 调用定时poller中的方法
  TAILQ_FOREACH_SAFE(poller, & thread -> timer_pollers, tailq, tmp) {
    // 类似非定时poller过程,不过会检查是否到了预定的时间
    if (now < poller -> next_run_tick) break;
  }
  // 最后统计时间
}

Io_device 和 io_channel在thread中也是非常重要的概念。它们的实现都在thread.c中,io_device是设备的抽象,io_channel是对该设备通道的抽象。一个线程可以创建多个io_channel . io_channel只能和一个io_device绑定,并且这个io_channel是别的线程使用不了的。

image.png