FS-Cache: Start processing an object's operations on that object's death

Start processing an object's operations when that object moves into the DYING
state as the object cannot be destroyed until all its outstanding operations
have completed.

Furthermore, make sure that read and allocation operations handle being woken
up on a dead object.  Such events are recorded in the Allocs.abt and
Retrvls.abt statistics as viewable through /proc/fs/fscache/stats.

The code for waiting for object activation for the read and allocation
operations is also extracted into its own function as it is much the same in
all cases, differing only in the stats incremented.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/fs/fscache/internal.h b/fs/fscache/internal.h
index 2bf463d..5b49a37 100644
--- a/fs/fscache/internal.h
+++ b/fs/fscache/internal.h
@@ -156,6 +156,7 @@
 extern atomic_t fscache_n_allocs_wait;
 extern atomic_t fscache_n_allocs_nobufs;
 extern atomic_t fscache_n_allocs_intr;
+extern atomic_t fscache_n_allocs_object_dead;
 extern atomic_t fscache_n_alloc_ops;
 extern atomic_t fscache_n_alloc_op_waits;
 
@@ -166,6 +167,7 @@
 extern atomic_t fscache_n_retrievals_nobufs;
 extern atomic_t fscache_n_retrievals_intr;
 extern atomic_t fscache_n_retrievals_nomem;
+extern atomic_t fscache_n_retrievals_object_dead;
 extern atomic_t fscache_n_retrieval_ops;
 extern atomic_t fscache_n_retrieval_op_waits;
 
@@ -249,9 +251,12 @@
 	atomic_dec(stat);
 }
 
+#define __fscache_stat(stat) (stat)
+
 extern const struct file_operations fscache_stats_fops;
 #else
 
+#define __fscache_stat(stat) (NULL)
 #define fscache_stat(stat) do {} while (0)
 #endif
 
diff --git a/fs/fscache/object.c b/fs/fscache/object.c
index 74bc562..c85c9f5 100644
--- a/fs/fscache/object.c
+++ b/fs/fscache/object.c
@@ -201,6 +201,7 @@
 		}
 		spin_unlock(&object->lock);
 		fscache_enqueue_dependents(object);
+		fscache_start_operations(object);
 		goto terminal_transit;
 
 		/* handle an abort during initialisation */
diff --git a/fs/fscache/page.c b/fs/fscache/page.c
index fc76798..c598ea4 100644
--- a/fs/fscache/page.c
+++ b/fs/fscache/page.c
@@ -314,6 +314,43 @@
 }
 
 /*
+ * wait for an object to become active (or dead)
+ */
+static int fscache_wait_for_retrieval_activation(struct fscache_object *object,
+						 struct fscache_retrieval *op,
+						 atomic_t *stat_op_waits,
+						 atomic_t *stat_object_dead)
+{
+	int ret;
+
+	if (!test_bit(FSCACHE_OP_WAITING, &op->op.flags))
+		goto check_if_dead;
+
+	_debug(">>> WT");
+	fscache_stat(stat_op_waits);
+	if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
+			fscache_wait_bit_interruptible,
+			TASK_INTERRUPTIBLE) < 0) {
+		ret = fscache_cancel_op(&op->op);
+		if (ret == 0)
+			return -ERESTARTSYS;
+
+		/* it's been removed from the pending queue by another party,
+		 * so we should get to run shortly */
+		wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
+			    fscache_wait_bit, TASK_UNINTERRUPTIBLE);
+	}
+	_debug("<<< GO");
+
+check_if_dead:
+	if (unlikely(fscache_object_is_dead(object))) {
+		fscache_stat(stat_object_dead);
+		return -ENOBUFS;
+	}
+	return 0;
+}
+
+/*
  * read a page from the cache or allocate a block in which to store it
  * - we return:
  *   -ENOMEM	- out of memory, nothing done
@@ -376,25 +413,12 @@
 
 	/* we wait for the operation to become active, and then process it
 	 * *here*, in this thread, and not in the thread pool */
-	if (test_bit(FSCACHE_OP_WAITING, &op->op.flags)) {
-		_debug(">>> WT");
-		fscache_stat(&fscache_n_retrieval_op_waits);
-		if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
-				fscache_wait_bit_interruptible,
-				TASK_INTERRUPTIBLE) < 0) {
-			ret = fscache_cancel_op(&op->op);
-			if (ret == 0) {
-				ret = -ERESTARTSYS;
-				goto error;
-			}
-
-			/* it's been removed from the pending queue by another
-			 * party, so we should get to run shortly */
-			wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
-				    fscache_wait_bit, TASK_UNINTERRUPTIBLE);
-		}
-		_debug("<<< GO");
-	}
+	ret = fscache_wait_for_retrieval_activation(
+		object, op,
+		__fscache_stat(&fscache_n_retrieval_op_waits),
+		__fscache_stat(&fscache_n_retrievals_object_dead));
+	if (ret < 0)
+		goto error;
 
 	/* ask the cache to honour the operation */
 	if (test_bit(FSCACHE_COOKIE_NO_DATA_YET, &object->cookie->flags)) {
@@ -506,25 +530,12 @@
 
 	/* we wait for the operation to become active, and then process it
 	 * *here*, in this thread, and not in the thread pool */
-	if (test_bit(FSCACHE_OP_WAITING, &op->op.flags)) {
-		_debug(">>> WT");
-		fscache_stat(&fscache_n_retrieval_op_waits);
-		if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
-				fscache_wait_bit_interruptible,
-				TASK_INTERRUPTIBLE) < 0) {
-			ret = fscache_cancel_op(&op->op);
-			if (ret == 0) {
-				ret = -ERESTARTSYS;
-				goto error;
-			}
-
-			/* it's been removed from the pending queue by another
-			 * party, so we should get to run shortly */
-			wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
-				    fscache_wait_bit, TASK_UNINTERRUPTIBLE);
-		}
-		_debug("<<< GO");
-	}
+	ret = fscache_wait_for_retrieval_activation(
+		object, op,
+		__fscache_stat(&fscache_n_retrieval_op_waits),
+		__fscache_stat(&fscache_n_retrievals_object_dead));
+	if (ret < 0)
+		goto error;
 
 	/* ask the cache to honour the operation */
 	if (test_bit(FSCACHE_COOKIE_NO_DATA_YET, &object->cookie->flags)) {
@@ -612,25 +623,12 @@
 
 	fscache_stat(&fscache_n_alloc_ops);
 
-	if (test_bit(FSCACHE_OP_WAITING, &op->op.flags)) {
-		_debug(">>> WT");
-		fscache_stat(&fscache_n_alloc_op_waits);
-		if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
-				fscache_wait_bit_interruptible,
-				TASK_INTERRUPTIBLE) < 0) {
-			ret = fscache_cancel_op(&op->op);
-			if (ret == 0) {
-				ret = -ERESTARTSYS;
-				goto error;
-			}
-
-			/* it's been removed from the pending queue by another
-			 * party, so we should get to run shortly */
-			wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
-				    fscache_wait_bit, TASK_UNINTERRUPTIBLE);
-		}
-		_debug("<<< GO");
-	}
+	ret = fscache_wait_for_retrieval_activation(
+		object, op,
+		__fscache_stat(&fscache_n_alloc_op_waits),
+		__fscache_stat(&fscache_n_allocs_object_dead));
+	if (ret < 0)
+		goto error;
 
 	/* ask the cache to honour the operation */
 	fscache_stat(&fscache_n_cop_allocate_page);
diff --git a/fs/fscache/stats.c b/fs/fscache/stats.c
index 9e15289..05f77ca 100644
--- a/fs/fscache/stats.c
+++ b/fs/fscache/stats.c
@@ -39,6 +39,7 @@
 atomic_t fscache_n_allocs_wait;
 atomic_t fscache_n_allocs_nobufs;
 atomic_t fscache_n_allocs_intr;
+atomic_t fscache_n_allocs_object_dead;
 atomic_t fscache_n_alloc_ops;
 atomic_t fscache_n_alloc_op_waits;
 
@@ -49,6 +50,7 @@
 atomic_t fscache_n_retrievals_nobufs;
 atomic_t fscache_n_retrievals_intr;
 atomic_t fscache_n_retrievals_nomem;
+atomic_t fscache_n_retrievals_object_dead;
 atomic_t fscache_n_retrieval_ops;
 atomic_t fscache_n_retrieval_op_waits;
 
@@ -188,9 +190,10 @@
 		   atomic_read(&fscache_n_allocs_wait),
 		   atomic_read(&fscache_n_allocs_nobufs),
 		   atomic_read(&fscache_n_allocs_intr));
-	seq_printf(m, "Allocs : ops=%u owt=%u\n",
+	seq_printf(m, "Allocs : ops=%u owt=%u abt=%u\n",
 		   atomic_read(&fscache_n_alloc_ops),
-		   atomic_read(&fscache_n_alloc_op_waits));
+		   atomic_read(&fscache_n_alloc_op_waits),
+		   atomic_read(&fscache_n_allocs_object_dead));
 
 	seq_printf(m, "Retrvls: n=%u ok=%u wt=%u nod=%u nbf=%u"
 		   " int=%u oom=%u\n",
@@ -201,9 +204,10 @@
 		   atomic_read(&fscache_n_retrievals_nobufs),
 		   atomic_read(&fscache_n_retrievals_intr),
 		   atomic_read(&fscache_n_retrievals_nomem));
-	seq_printf(m, "Retrvls: ops=%u owt=%u\n",
+	seq_printf(m, "Retrvls: ops=%u owt=%u abt=%u\n",
 		   atomic_read(&fscache_n_retrieval_ops),
-		   atomic_read(&fscache_n_retrieval_op_waits));
+		   atomic_read(&fscache_n_retrieval_op_waits),
+		   atomic_read(&fscache_n_retrievals_object_dead));
 
 	seq_printf(m, "Stores : n=%u ok=%u agn=%u nbf=%u oom=%u\n",
 		   atomic_read(&fscache_n_stores),