nfsd4: Move callback setup to callback queue

Instead of creating the new rpc client from a regular server thread,
set a flag, kick off a null call, and allow the null call to do the work
of setting up the client on the callback workqueue.

Use a spinlock to ensure the callback work gets a consistent view of the
callback parameters.

This allows, for example, changing the callback from contexts where
sleeping is not allowed.  I hope it will also keep the locking simple as
we add more session and trunking features, by serializing most of the
callback-specific work.

This also closes a small race where the the new cb_ident could be used
with an old connection (or vice-versa).

Signed-off-by: J. Bruce Fields <bfields@citi.umich.edu>
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index 07c3be6..a269dbe 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -284,7 +284,7 @@
 	struct xdr_stream xdr;
 	struct nfs4_delegation *args = cb->cb_op;
 	struct nfs4_cb_compound_hdr hdr = {
-		.ident = args->dl_ident,
+		.ident = cb->cb_clp->cl_cb_ident,
 		.minorversion = cb->cb_minorversion,
 	};
 
@@ -506,7 +506,8 @@
 			PTR_ERR(client));
 		return PTR_ERR(client);
 	}
-	nfsd4_set_callback_client(clp, client);
+	clp->cl_cb_ident = conn->cb_ident;
+	clp->cl_cb_client = client;
 	return 0;
 
 }
@@ -569,15 +570,12 @@
  */
 void nfsd4_probe_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
 {
-	int status;
-
 	BUG_ON(atomic_read(&clp->cl_cb_set));
 
-	status = setup_callback_client(clp, conn);
-	if (status) {
-		warn_no_callback_path(clp, status);
-		return;
-	}
+	spin_lock(&clp->cl_lock);
+	memcpy(&clp->cl_cb_conn, conn, sizeof(struct nfs4_cb_conn));
+	set_bit(NFSD4_CLIENT_CB_UPDATE, &clp->cl_cb_flags);
+	spin_unlock(&clp->cl_lock);
 	do_probe_callback(clp);
 }
 
@@ -730,19 +728,16 @@
 }
 
 /* must be called under the state lock */
-void nfsd4_set_callback_client(struct nfs4_client *clp, struct rpc_clnt *new)
+void nfsd4_shutdown_callback(struct nfs4_client *clp)
 {
-	struct rpc_clnt *old = clp->cl_cb_client;
-
-	clp->cl_cb_client = new;
+	set_bit(NFSD4_CLIENT_KILL, &clp->cl_cb_flags);
 	/*
-	 * After this, any work that saw the old value of cl_cb_client will
-	 * be gone:
+	 * Note this won't actually result in a null callback;
+	 * instead, nfsd4_do_callback_rpc() will detect the killed
+	 * client, destroy the rpc client, and stop:
 	 */
+	do_probe_callback(clp);
 	flush_workqueue(callback_wq);
-	/* So we can safely shut it down: */
-	if (old)
-		rpc_shutdown_client(old);
 }
 
 void nfsd4_release_cb(struct nfsd4_callback *cb)
@@ -751,15 +746,51 @@
 		cb->cb_ops->rpc_release(cb);
 }
 
+void nfsd4_process_cb_update(struct nfsd4_callback *cb)
+{
+	struct nfs4_cb_conn conn;
+	struct nfs4_client *clp = cb->cb_clp;
+	int err;
+
+	/*
+	 * This is either an update, or the client dying; in either case,
+	 * kill the old client:
+	 */
+	if (clp->cl_cb_client) {
+		rpc_shutdown_client(clp->cl_cb_client);
+		clp->cl_cb_client = NULL;
+	}
+	if (test_bit(NFSD4_CLIENT_KILL, &clp->cl_cb_flags))
+		return;
+	spin_lock(&clp->cl_lock);
+	/*
+	 * Only serialized callback code is allowed to clear these
+	 * flags; main nfsd code can only set them:
+	 */
+	BUG_ON(!clp->cl_cb_flags);
+	clear_bit(NFSD4_CLIENT_CB_UPDATE, &clp->cl_cb_flags);
+	memcpy(&conn, &cb->cb_clp->cl_cb_conn, sizeof(struct nfs4_cb_conn));
+	spin_unlock(&clp->cl_lock);
+
+	err = setup_callback_client(clp, &conn);
+	if (err)
+		warn_no_callback_path(clp, err);
+}
+
 void nfsd4_do_callback_rpc(struct work_struct *w)
 {
 	struct nfsd4_callback *cb = container_of(w, struct nfsd4_callback, cb_work);
 	struct nfs4_client *clp = cb->cb_clp;
-	struct rpc_clnt *clnt = clp->cl_cb_client;
+	struct rpc_clnt *clnt;
 
-	if (clnt == NULL) {
+	if (clp->cl_cb_flags)
+		nfsd4_process_cb_update(cb);
+
+	clnt = clp->cl_cb_client;
+	if (!clnt) {
+		/* Callback channel broken, or client killed; give up: */
 		nfsd4_release_cb(cb);
-		return; /* Client is shutting down; give up. */
+		return;
 	}
 	rpc_call_async(clnt, &cb->cb_msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN,
 			cb->cb_ops, cb);