テストステ論

高テス協会会長が, テストステロンに関する情報をお届けします.

device-mapperの仕組み (5) dm_ioでI/Oを簡単に発行しよう!

device-mapperのターゲットを実装する時, 単にbioの行き先を変更するだけでなく, その途中でI/Oを発行したいことがしばしばあります. 例えば, キャッシュのメタデータを書き出すなどと言った処理です. filesystemではこのような処理は, submit_bioというgeneric_make_requestというラッパーを利用して実装されていることが多いですが, device-mapperフレームワークはより抽象的で使いやすい関数を提供しています. それがdm_ioです.

以前に, LinuxのブロックI/Oは非同期しかサポートしていないということを述べましたが, dm_ioは, 同期I/Oをサポートします. その仕組みは非同期でI/Oを出したのち眠ってしまい, endioでwakeup_processしてもらうという仕組みです. 以下のコードを見るように, io_req->notify.fnという非同期I/Oのコールバック関数がNULLであれば同期I/Oになるという仕組みです.

dm_ioは, where配列で指定された複数領域に対してwriteを投げることが出来ます(readはnum_regions=1が強制されます). dm_io_requestというのは, 「I/Oの素」であると考えると良いでしょう. dpagesというのは, 「I/O用バッファの供給元」です. dm_io_requestには, 「bioのフラグ」「どうやってバッファを用意するのか」「コールバック関数はどうするのか」という情報が詰め込まれています. dpagesは, さまざまなメモリタイプの領域からバッファを確保することが出来て, その種類はdm_io_mem_typeに書いてあります. 構造体を適切に設定し, dm_ioに対して入力することにより, I/O処理が行われます.
なお, コメント部分には「発行したあとにblk_unplug()するか, REQ_SYNCを設定しろ」と書いてあります. 逆らう明確な理由がない限りは素直につけましょう. REQ_SYNCは名前が紛らわしいですが, これをつけたからといって同期I/Oになるというものではありません. REQ_SYNCフラグはあくまでも, I/Oスケジューラに対する「なるべく早くI/O完了させてね」というヒント情報でしかなく*1, CFQスケジューラでしか使われていません. 「新しいインターフェイス」と書いてありますが, 2.6.30くらいからはあるので, さほど新しくはありません.

/*
 * New collapsed (a)synchronous interface.
 *
 * If the IO is asynchronous (i.e. it has notify.fn), you must either unplug
 * the queue with blk_unplug() some time later or set REQ_SYNC in
io_req->bi_rw. If you fail to do one of these, the IO will be submitted to
 * the disk after q->unplug_delay, which defaults to 3ms in blk-settings.c.
 */
int dm_io(struct dm_io_request *io_req, unsigned num_regions,
          struct dm_io_region *where, unsigned long *sync_error_bits)
{
        int r;
        struct dpages dp;

        r = dp_init(io_req, &dp, (unsigned long)where->count << SECTOR_SHIFT);
        if (r)
                return r;

        if (!io_req->notify.fn)
                return sync_io(io_req->client, num_regions, where,
                               io_req->bi_rw, &dp, sync_error_bits);

        return async_io(io_req->client, num_regions, where, io_req->bi_rw,
                        &dp, io_req->notify.fn, io_req->notify.context);
}
EXPORT_SYMBOL(dm_io);

dm_io関数に関連した構造体

struct dm_io_request {
        int bi_rw; /* READ|WRITE - not READA */
        struct dm_io_memory mem; /* Memory to use for io */
        struct dm_io_notify notify; /* Synchronous if notify.fn is NULL */
        struct dm_io_client *client; /* Client memory handler */
};

enum dm_io_mem_type {
        DM_IO_PAGE_LIST,/* Page list */
        DM_IO_BVEC, /* Bio vector */
        DM_IO_VMA, /* Virtual memory area */
        DM_IO_KMEM, /* Kernel memory */
};

struct dm_io_notify {
        io_notify_fn fn; /* Callback for asynchronous requests */
        void *context; /* Passed to callback */
};
typedef void (*io_notify_fn)(unsigned long error, void *context);

struct dm_io_region {
        struct block_device *bdev;
        sector_t sector;
        sector_t count; /* If this is zero the region is ignored. */
};

続いて, dm_ioからコードを辿ってみましょう. dm_ioも, 仮想デバイスのdm_requestと同様に, I/O発行数をカウントして, すべてが終わったらコールバックを呼ぶという仕組みになっています.

sync_io (説明のため簡略化)

static int sync_io(struct dm_io_client *client, unsigned int num_regions,
                   struct dm_io_region *where, int rw, struct dpages *dp,
                   unsigned long *error_bits)
{
        // readはnum_regionsが1でお願いします. そうしなければ警告して-EIO        
        if (num_regions > 1 && (rw & RW_MASK) != WRITE) {
                WARN_ON(1);
                return -EIO;
        }

        io->error_bits = 0;
        atomic_set(&io->count, 1); /* see dispatch_io() */
        io->sleeper = current; // あとで起こしてもらうプロセスを設定します.
        io->client = client;

        dispatch_io(rw, num_regions, where, dp, io, 1);

        while (1) {
                set_current_state(TASK_UNINTERRUPTIBLE);
                // もし, I/Oが速すぎてこの時点でio->countが0になっていたら眠ってはならない.
                if (!atomic_read(&io->count))
                        break;

                io_schedule(); // 寝ます.
        }
        set_current_state(TASK_RUNNING);

dispatch_io. 複数のI/Oを発行します. io->countをダミー的にincしているのは, 以下のようなロジックを回避するためです*2. 他にも実装はあると思いますが, とてもシンプルな実装です. 鬱陶しいコーナーケースを, このようにダミーを設定することによって回避するという手法は, プログラミングの一般的な技法です. こうすることで, コーナーケースへの処理をコード全体に広けずに済んでいるのです*3.
1. 本当は2個発行したいです.
2. 一個発行しました(incしてsubmit_bioした).
3. 二個目を発行しようとしています.
4. しかし, incする前に1個目がcompletionして, decされました.
5. 2個目が発行されてすらいないのにI/Oがすべて完了したように見えてしまいました.
6. 設定されたコールバック関数が呼ばれてしまいました.

static void dispatch_io(int rw, unsigned int num_regions,
                        struct dm_io_region *where, struct dpages *dp,
                        struct io *io, int sync)
{
        int i;
        struct dpages old_pages = *dp;

        BUG_ON(num_regions > DM_IO_MAX_REGIONS);

        if (sync)
                rw |= REQ_SYNC;

        /*
         * For multiple regions we need to be careful to rewind
         * the dp object for each call to do_region.
         */
        for (i = 0; i < num_regions; i++) {
                *dp = old_pages;
                if (where[i].count || (rw & REQ_FLUSH))
                        do_region(rw, i, where + i, dp, io);
        }

        /*
         * Drop the extra reference that we were holding to avoid
         * the io being completed too early.
         */
        dec_count(io, 0, 0);
}

do_region(抜粋). 小さなbioを発行しているだけです. コールバックはendioという関数です.

do {
                bio = bio_alloc_bioset(GFP_NOIO, num_bvecs, io->client->bios);
                bio->bi_sector = where->sector + (where->count - remaining);
                bio->bi_bdev = where->bdev;
                bio->bi_end_io = endio;
                // 略
                atomic_inc(&io->count);
                submit_bio(rw, bio); // I/Oを実際に発行する.
} while (remaining);

小さなbioのコールバックであるendioは, dec_countを呼びます.

dec_countは, io->countが0になると, 同期I/Oの場合はsleeper(currentが設定済)を叩き起こします. 非同期I/Oの場合は, コールバック関数(io_notify_fn)を設定されたcontextを引数として呼びます.

static void dec_count(struct io *io, unsigned int region, int error)
{
        if (error)
                set_bit(region, &io->error_bits);

        if (atomic_dec_and_test(&io->count)) {
                if (io->vma_invalidate_size)
                        invalidate_kernel_vmap_range(io->vma_invalidate_address,
                                                     io->vma_invalidate_size);

                if (io->sleeper)
                        wake_up_process(io->sleeper);

                else {
                        unsigned long r = io->error_bits;
                        io_notify_fn fn = io->callback;
                        void *context = io->context;

                        mempool_free(io, io->client->pool);
                        fn(r, context);
                }
        }
}

以上です. 今回は, device-mapperが用意しているI/O発行関数であるdm_ioについて説明をしました. これらのコードには, LinuxにおいてブロックデバイスにI/Oを発行するためのエッセンスが詰まっていると思います. dm_ioは素晴らしいAPIですが, 使い方に注意が必要な場面があります. 次回は, このdm_ioを安易に使うとデッドロックを起こすという話をします.

*1:not a few people misunderstand it

*2:実際に発行するかは, num_regionsで与えた数と一致しません. APIの上から1つでも, 実際には2個発行されるかも知れません. 実際に計算しなければ分からないためこのようなことになってしまっています

*3:dm-lcではこのようなテクニックが多様されているため, コードがシンプルになっています