dm: optimize use SRCU and RCU

This patch removes "io_lock" and "map_lock" in struct mapped_device and
"holders" in struct dm_table and replaces these mechanisms with
sleepable-rcu.

Previously, the code would call "dm_get_live_table" and "dm_table_put" to
get and release table. Now, the code is changed to call "dm_get_live_table"
and "dm_put_live_table". dm_get_live_table locks sleepable-rcu and
dm_put_live_table unlocks it.

dm_get_live_table_fast/dm_put_live_table_fast can be used instead of
dm_get_live_table/dm_put_live_table. These *_fast functions use
non-sleepable RCU, so the caller must not block between them.

If the code changes active or inactive dm table, it must call
dm_sync_table before destroying the old table.

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Signed-off-by: Jun'ichi Nomura <j-nomura@ce.jp.nec.com>
Signed-off-by: Alasdair G Kergon <agk@redhat.com>
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index 33f2010..ecff83f 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -117,12 +117,19 @@
 #define DMF_MERGE_IS_OPTIONAL 6
 
 /*
+ * A dummy definition to make RCU happy.
+ * struct dm_table should never be dereferenced in this file.
+ */
+struct dm_table {
+	int undefined__;
+};
+
+/*
  * Work processed by per-device workqueue.
  */
 struct mapped_device {
-	struct rw_semaphore io_lock;
+	struct srcu_struct io_barrier;
 	struct mutex suspend_lock;
-	rwlock_t map_lock;
 	atomic_t holders;
 	atomic_t open_count;
 
@@ -156,6 +163,8 @@
 
 	/*
 	 * The current mapping.
+	 * Use dm_get_live_table{_fast} or take suspend_lock for
+	 * dereference.
 	 */
 	struct dm_table *map;
 
@@ -386,12 +395,14 @@
 			unsigned int cmd, unsigned long arg)
 {
 	struct mapped_device *md = bdev->bd_disk->private_data;
+	int srcu_idx;
 	struct dm_table *map;
 	struct dm_target *tgt;
 	int r = -ENOTTY;
 
 retry:
-	map = dm_get_live_table(md);
+	map = dm_get_live_table(md, &srcu_idx);
+
 	if (!map || !dm_table_get_size(map))
 		goto out;
 
@@ -410,7 +421,7 @@
 		r = tgt->type->ioctl(tgt, cmd, arg);
 
 out:
-	dm_table_put(map);
+	dm_put_live_table(md, srcu_idx);
 
 	if (r == -ENOTCONN) {
 		msleep(10);
@@ -509,20 +520,39 @@
 /*
  * Everyone (including functions in this file), should use this
  * function to access the md->map field, and make sure they call
- * dm_table_put() when finished.
+ * dm_put_live_table() when finished.
  */
-struct dm_table *dm_get_live_table(struct mapped_device *md)
+struct dm_table *dm_get_live_table(struct mapped_device *md, int *srcu_idx) __acquires(md->io_barrier)
 {
-	struct dm_table *t;
-	unsigned long flags;
+	*srcu_idx = srcu_read_lock(&md->io_barrier);
 
-	read_lock_irqsave(&md->map_lock, flags);
-	t = md->map;
-	if (t)
-		dm_table_get(t);
-	read_unlock_irqrestore(&md->map_lock, flags);
+	return srcu_dereference(md->map, &md->io_barrier);
+}
 
-	return t;
+void dm_put_live_table(struct mapped_device *md, int srcu_idx) __releases(md->io_barrier)
+{
+	srcu_read_unlock(&md->io_barrier, srcu_idx);
+}
+
+void dm_sync_table(struct mapped_device *md)
+{
+	synchronize_srcu(&md->io_barrier);
+	synchronize_rcu_expedited();
+}
+
+/*
+ * A fast alternative to dm_get_live_table/dm_put_live_table.
+ * The caller must not block between these two functions.
+ */
+static struct dm_table *dm_get_live_table_fast(struct mapped_device *md) __acquires(RCU)
+{
+	rcu_read_lock();
+	return rcu_dereference(md->map);
+}
+
+static void dm_put_live_table_fast(struct mapped_device *md) __releases(RCU)
+{
+	rcu_read_unlock();
 }
 
 /*
@@ -1356,17 +1386,18 @@
 /*
  * Entry point to split a bio into clones and submit them to the targets.
  */
-static void __split_and_process_bio(struct mapped_device *md, struct bio *bio)
+static void __split_and_process_bio(struct mapped_device *md,
+				    struct dm_table *map, struct bio *bio)
 {
 	struct clone_info ci;
 	int error = 0;
 
-	ci.map = dm_get_live_table(md);
-	if (unlikely(!ci.map)) {
+	if (unlikely(!map)) {
 		bio_io_error(bio);
 		return;
 	}
 
+	ci.map = map;
 	ci.md = md;
 	ci.io = alloc_io(md);
 	ci.io->error = 0;
@@ -1393,7 +1424,6 @@
 
 	/* drop the extra reference count */
 	dec_pending(ci.io, error);
-	dm_table_put(ci.map);
 }
 /*-----------------------------------------------------------------
  * CRUD END
@@ -1404,7 +1434,7 @@
 			 struct bio_vec *biovec)
 {
 	struct mapped_device *md = q->queuedata;
-	struct dm_table *map = dm_get_live_table(md);
+	struct dm_table *map = dm_get_live_table_fast(md);
 	struct dm_target *ti;
 	sector_t max_sectors;
 	int max_size = 0;
@@ -1414,7 +1444,7 @@
 
 	ti = dm_table_find_target(map, bvm->bi_sector);
 	if (!dm_target_is_valid(ti))
-		goto out_table;
+		goto out;
 
 	/*
 	 * Find maximum amount of I/O that won't need splitting
@@ -1443,10 +1473,8 @@
 
 		max_size = 0;
 
-out_table:
-	dm_table_put(map);
-
 out:
+	dm_put_live_table_fast(md);
 	/*
 	 * Always allow an entire first page
 	 */
@@ -1465,8 +1493,10 @@
 	int rw = bio_data_dir(bio);
 	struct mapped_device *md = q->queuedata;
 	int cpu;
+	int srcu_idx;
+	struct dm_table *map;
 
-	down_read(&md->io_lock);
+	map = dm_get_live_table(md, &srcu_idx);
 
 	cpu = part_stat_lock();
 	part_stat_inc(cpu, &dm_disk(md)->part0, ios[rw]);
@@ -1475,7 +1505,7 @@
 
 	/* if we're suspended, we have to queue this io for later */
 	if (unlikely(test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags))) {
-		up_read(&md->io_lock);
+		dm_put_live_table(md, srcu_idx);
 
 		if (bio_rw(bio) != READA)
 			queue_io(md, bio);
@@ -1484,8 +1514,8 @@
 		return;
 	}
 
-	__split_and_process_bio(md, bio);
-	up_read(&md->io_lock);
+	__split_and_process_bio(md, map, bio);
+	dm_put_live_table(md, srcu_idx);
 	return;
 }
 
@@ -1671,7 +1701,8 @@
 static void dm_request_fn(struct request_queue *q)
 {
 	struct mapped_device *md = q->queuedata;
-	struct dm_table *map = dm_get_live_table(md);
+	int srcu_idx;
+	struct dm_table *map = dm_get_live_table(md, &srcu_idx);
 	struct dm_target *ti;
 	struct request *rq, *clone;
 	sector_t pos;
@@ -1726,7 +1757,7 @@
 delay_and_out:
 	blk_delay_queue(q, HZ / 10);
 out:
-	dm_table_put(map);
+	dm_put_live_table(md, srcu_idx);
 }
 
 int dm_underlying_device_busy(struct request_queue *q)
@@ -1739,14 +1770,14 @@
 {
 	int r;
 	struct mapped_device *md = q->queuedata;
-	struct dm_table *map = dm_get_live_table(md);
+	struct dm_table *map = dm_get_live_table_fast(md);
 
 	if (!map || test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags))
 		r = 1;
 	else
 		r = dm_table_any_busy_target(map);
 
-	dm_table_put(map);
+	dm_put_live_table_fast(md);
 
 	return r;
 }
@@ -1758,7 +1789,7 @@
 	struct dm_table *map;
 
 	if (!test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags)) {
-		map = dm_get_live_table(md);
+		map = dm_get_live_table_fast(md);
 		if (map) {
 			/*
 			 * Request-based dm cares about only own queue for
@@ -1769,9 +1800,8 @@
 				    bdi_bits;
 			else
 				r = dm_table_any_congested(map, bdi_bits);
-
-			dm_table_put(map);
 		}
+		dm_put_live_table_fast(md);
 	}
 
 	return r;
@@ -1876,12 +1906,14 @@
 	if (r < 0)
 		goto bad_minor;
 
+	r = init_srcu_struct(&md->io_barrier);
+	if (r < 0)
+		goto bad_io_barrier;
+
 	md->type = DM_TYPE_NONE;
-	init_rwsem(&md->io_lock);
 	mutex_init(&md->suspend_lock);
 	mutex_init(&md->type_lock);
 	spin_lock_init(&md->deferred_lock);
-	rwlock_init(&md->map_lock);
 	atomic_set(&md->holders, 1);
 	atomic_set(&md->open_count, 0);
 	atomic_set(&md->event_nr, 0);
@@ -1944,6 +1976,8 @@
 bad_disk:
 	blk_cleanup_queue(md->queue);
 bad_queue:
+	cleanup_srcu_struct(&md->io_barrier);
+bad_io_barrier:
 	free_minor(minor);
 bad_minor:
 	module_put(THIS_MODULE);
@@ -1967,6 +2001,7 @@
 		bioset_free(md->bs);
 	blk_integrity_unregister(md->disk);
 	del_gendisk(md->disk);
+	cleanup_srcu_struct(&md->io_barrier);
 	free_minor(minor);
 
 	spin_lock(&_minor_lock);
@@ -2109,7 +2144,6 @@
 	struct dm_table *old_map;
 	struct request_queue *q = md->queue;
 	sector_t size;
-	unsigned long flags;
 	int merge_is_optional;
 
 	size = dm_table_get_size(t);
@@ -2138,9 +2172,8 @@
 
 	merge_is_optional = dm_table_merge_is_optional(t);
 
-	write_lock_irqsave(&md->map_lock, flags);
 	old_map = md->map;
-	md->map = t;
+	rcu_assign_pointer(md->map, t);
 	md->immutable_target_type = dm_table_get_immutable_target_type(t);
 
 	dm_table_set_restrictions(t, q, limits);
@@ -2148,7 +2181,7 @@
 		set_bit(DMF_MERGE_IS_OPTIONAL, &md->flags);
 	else
 		clear_bit(DMF_MERGE_IS_OPTIONAL, &md->flags);
-	write_unlock_irqrestore(&md->map_lock, flags);
+	dm_sync_table(md);
 
 	return old_map;
 }
@@ -2159,15 +2192,13 @@
 static struct dm_table *__unbind(struct mapped_device *md)
 {
 	struct dm_table *map = md->map;
-	unsigned long flags;
 
 	if (!map)
 		return NULL;
 
 	dm_table_event_callback(map, NULL, NULL);
-	write_lock_irqsave(&md->map_lock, flags);
-	md->map = NULL;
-	write_unlock_irqrestore(&md->map_lock, flags);
+	rcu_assign_pointer(md->map, NULL);
+	dm_sync_table(md);
 
 	return map;
 }
@@ -2319,11 +2350,12 @@
 static void __dm_destroy(struct mapped_device *md, bool wait)
 {
 	struct dm_table *map;
+	int srcu_idx;
 
 	might_sleep();
 
 	spin_lock(&_minor_lock);
-	map = dm_get_live_table(md);
+	map = dm_get_live_table(md, &srcu_idx);
 	idr_replace(&_minor_idr, MINOR_ALLOCED, MINOR(disk_devt(dm_disk(md))));
 	set_bit(DMF_FREEING, &md->flags);
 	spin_unlock(&_minor_lock);
@@ -2333,6 +2365,9 @@
 		dm_table_postsuspend_targets(map);
 	}
 
+	/* dm_put_live_table must be before msleep, otherwise deadlock is possible */
+	dm_put_live_table(md, srcu_idx);
+
 	/*
 	 * Rare, but there may be I/O requests still going to complete,
 	 * for example.  Wait for all references to disappear.
@@ -2347,7 +2382,6 @@
 		       dm_device_name(md), atomic_read(&md->holders));
 
 	dm_sysfs_exit(md);
-	dm_table_put(map);
 	dm_table_destroy(__unbind(md));
 	free_dev(md);
 }
@@ -2404,8 +2438,10 @@
 	struct mapped_device *md = container_of(work, struct mapped_device,
 						work);
 	struct bio *c;
+	int srcu_idx;
+	struct dm_table *map;
 
-	down_read(&md->io_lock);
+	map = dm_get_live_table(md, &srcu_idx);
 
 	while (!test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags)) {
 		spin_lock_irq(&md->deferred_lock);
@@ -2415,17 +2451,13 @@
 		if (!c)
 			break;
 
-		up_read(&md->io_lock);
-
 		if (dm_request_based(md))
 			generic_make_request(c);
 		else
-			__split_and_process_bio(md, c);
-
-		down_read(&md->io_lock);
+			__split_and_process_bio(md, map, c);
 	}
 
-	up_read(&md->io_lock);
+	dm_put_live_table(md, srcu_idx);
 }
 
 static void dm_queue_flush(struct mapped_device *md)
@@ -2457,10 +2489,10 @@
 	 * reappear.
 	 */
 	if (dm_table_has_no_data_devices(table)) {
-		live_map = dm_get_live_table(md);
+		live_map = dm_get_live_table_fast(md);
 		if (live_map)
 			limits = md->queue->limits;
-		dm_table_put(live_map);
+		dm_put_live_table_fast(md);
 	}
 
 	if (!live_map) {
@@ -2540,7 +2572,7 @@
 		goto out_unlock;
 	}
 
-	map = dm_get_live_table(md);
+	map = md->map;
 
 	/*
 	 * DMF_NOFLUSH_SUSPENDING must be set before presuspend.
@@ -2561,7 +2593,7 @@
 	if (!noflush && do_lockfs) {
 		r = lock_fs(md);
 		if (r)
-			goto out;
+			goto out_unlock;
 	}
 
 	/*
@@ -2576,9 +2608,8 @@
 	 * (dm_wq_work), we set BMF_BLOCK_IO_FOR_SUSPEND and call
 	 * flush_workqueue(md->wq).
 	 */
-	down_write(&md->io_lock);
 	set_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags);
-	up_write(&md->io_lock);
+	synchronize_srcu(&md->io_barrier);
 
 	/*
 	 * Stop md->queue before flushing md->wq in case request-based
@@ -2596,10 +2627,9 @@
 	 */
 	r = dm_wait_for_completion(md, TASK_INTERRUPTIBLE);
 
-	down_write(&md->io_lock);
 	if (noflush)
 		clear_bit(DMF_NOFLUSH_SUSPENDING, &md->flags);
-	up_write(&md->io_lock);
+	synchronize_srcu(&md->io_barrier);
 
 	/* were we interrupted ? */
 	if (r < 0) {
@@ -2609,7 +2639,7 @@
 			start_queue(md->queue);
 
 		unlock_fs(md);
-		goto out; /* pushback list is already flushed, so skip flush */
+		goto out_unlock; /* pushback list is already flushed, so skip flush */
 	}
 
 	/*
@@ -2622,9 +2652,6 @@
 
 	dm_table_postsuspend_targets(map);
 
-out:
-	dm_table_put(map);
-
 out_unlock:
 	mutex_unlock(&md->suspend_lock);
 	return r;
@@ -2639,7 +2666,7 @@
 	if (!dm_suspended_md(md))
 		goto out;
 
-	map = dm_get_live_table(md);
+	map = md->map;
 	if (!map || !dm_table_get_size(map))
 		goto out;
 
@@ -2663,7 +2690,6 @@
 
 	r = 0;
 out:
-	dm_table_put(map);
 	mutex_unlock(&md->suspend_lock);
 
 	return r;