NFSv4.1: Fix bulk recall and destroy of layouts

The current code in pnfs_destroy_all_layouts() assumes that removing
the layout from the server->layouts list is sufficient to make it
invisible to other processes. This ignores the fact that most
users access the layout through the nfs_inode->layout...
There is further breakage due to lack of reference counting of the
layouts, meaning that the whole thing Oopses at the drop of a hat.

The code in initiate_bulk_draining() is almost correct, and can be
used as a model for pnfs_destroy_all_layouts(), so move that
code to pnfs.c, and refactor the code to allow us to choose between
a single filesystem bulk recall, and a recall of all layouts.
Also note that initiate_bulk_draining() currently calls iput() while
holding locks. Fix that too.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Cc: stable@vger.kernel.org
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index d00260b..6be70f6 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -505,6 +505,136 @@
 }
 EXPORT_SYMBOL_GPL(pnfs_destroy_layout);
 
+static bool
+pnfs_layout_add_bulk_destroy_list(struct inode *inode,
+		struct list_head *layout_list)
+{
+	struct pnfs_layout_hdr *lo;
+	bool ret = false;
+
+	spin_lock(&inode->i_lock);
+	lo = NFS_I(inode)->layout;
+	if (lo != NULL && list_empty(&lo->plh_bulk_destroy)) {
+		pnfs_get_layout_hdr(lo);
+		list_add(&lo->plh_bulk_destroy, layout_list);
+		ret = true;
+	}
+	spin_unlock(&inode->i_lock);
+	return ret;
+}
+
+/* Caller must hold rcu_read_lock and clp->cl_lock */
+static int
+pnfs_layout_bulk_destroy_byserver_locked(struct nfs_client *clp,
+		struct nfs_server *server,
+		struct list_head *layout_list)
+{
+	struct pnfs_layout_hdr *lo, *next;
+	struct inode *inode;
+
+	list_for_each_entry_safe(lo, next, &server->layouts, plh_layouts) {
+		inode = igrab(lo->plh_inode);
+		if (inode == NULL)
+			continue;
+		list_del_init(&lo->plh_layouts);
+		if (pnfs_layout_add_bulk_destroy_list(inode, layout_list))
+			continue;
+		rcu_read_unlock();
+		spin_unlock(&clp->cl_lock);
+		iput(inode);
+		spin_lock(&clp->cl_lock);
+		rcu_read_lock();
+		return -EAGAIN;
+	}
+	return 0;
+}
+
+static int
+pnfs_layout_free_bulk_destroy_list(struct list_head *layout_list,
+		bool is_bulk_recall)
+{
+	struct pnfs_layout_hdr *lo;
+	struct inode *inode;
+	struct pnfs_layout_range range = {
+		.iomode = IOMODE_ANY,
+		.offset = 0,
+		.length = NFS4_MAX_UINT64,
+	};
+	LIST_HEAD(lseg_list);
+	int ret = 0;
+
+	while (!list_empty(layout_list)) {
+		lo = list_entry(layout_list->next, struct pnfs_layout_hdr,
+				plh_bulk_destroy);
+		dprintk("%s freeing layout for inode %lu\n", __func__,
+			lo->plh_inode->i_ino);
+		inode = lo->plh_inode;
+		spin_lock(&inode->i_lock);
+		list_del_init(&lo->plh_bulk_destroy);
+		lo->plh_block_lgets++; /* permanently block new LAYOUTGETs */
+		if (is_bulk_recall)
+			set_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
+		if (pnfs_mark_matching_lsegs_invalid(lo, &lseg_list, &range))
+			ret = -EAGAIN;
+		spin_unlock(&inode->i_lock);
+		pnfs_free_lseg_list(&lseg_list);
+		pnfs_put_layout_hdr(lo);
+		iput(inode);
+	}
+	return ret;
+}
+
+int
+pnfs_destroy_layouts_byfsid(struct nfs_client *clp,
+		struct nfs_fsid *fsid,
+		bool is_recall)
+{
+	struct nfs_server *server;
+	LIST_HEAD(layout_list);
+
+	spin_lock(&clp->cl_lock);
+	rcu_read_lock();
+restart:
+	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
+		if (memcmp(&server->fsid, fsid, sizeof(*fsid)) != 0)
+			continue;
+		if (pnfs_layout_bulk_destroy_byserver_locked(clp,
+				server,
+				&layout_list) != 0)
+			goto restart;
+	}
+	rcu_read_unlock();
+	spin_unlock(&clp->cl_lock);
+
+	if (list_empty(&layout_list))
+		return 0;
+	return pnfs_layout_free_bulk_destroy_list(&layout_list, is_recall);
+}
+
+int
+pnfs_destroy_layouts_byclid(struct nfs_client *clp,
+		bool is_recall)
+{
+	struct nfs_server *server;
+	LIST_HEAD(layout_list);
+
+	spin_lock(&clp->cl_lock);
+	rcu_read_lock();
+restart:
+	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
+		if (pnfs_layout_bulk_destroy_byserver_locked(clp,
+					server,
+					&layout_list) != 0)
+			goto restart;
+	}
+	rcu_read_unlock();
+	spin_unlock(&clp->cl_lock);
+
+	if (list_empty(&layout_list))
+		return 0;
+	return pnfs_layout_free_bulk_destroy_list(&layout_list, is_recall);
+}
+
 /*
  * Called by the state manger to remove all layouts established under an
  * expired lease.
@@ -512,30 +642,10 @@
 void
 pnfs_destroy_all_layouts(struct nfs_client *clp)
 {
-	struct nfs_server *server;
-	struct pnfs_layout_hdr *lo;
-	LIST_HEAD(tmp_list);
-
 	nfs4_deviceid_mark_client_invalid(clp);
 	nfs4_deviceid_purge_client(clp);
 
-	spin_lock(&clp->cl_lock);
-	rcu_read_lock();
-	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		if (!list_empty(&server->layouts))
-			list_splice_init(&server->layouts, &tmp_list);
-	}
-	rcu_read_unlock();
-	spin_unlock(&clp->cl_lock);
-
-	while (!list_empty(&tmp_list)) {
-		lo = list_entry(tmp_list.next, struct pnfs_layout_hdr,
-				plh_layouts);
-		dprintk("%s freeing layout for inode %lu\n", __func__,
-			lo->plh_inode->i_ino);
-		list_del_init(&lo->plh_layouts);
-		pnfs_destroy_layout(NFS_I(lo->plh_inode));
-	}
+	pnfs_destroy_layouts_byclid(clp, false);
 }
 
 /*
@@ -888,7 +998,7 @@
 	atomic_set(&lo->plh_refcount, 1);
 	INIT_LIST_HEAD(&lo->plh_layouts);
 	INIT_LIST_HEAD(&lo->plh_segs);
-	INIT_LIST_HEAD(&lo->plh_bulk_recall);
+	INIT_LIST_HEAD(&lo->plh_bulk_destroy);
 	lo->plh_inode = ino;
 	lo->plh_lc_cred = get_rpccred(ctx->state->owner->so_cred);
 	return lo;