Merge git://git.linux-nfs.org/pub/linux/nfs-2.6

* git://git.linux-nfs.org/pub/linux/nfs-2.6: (131 commits)
  NFSv4: Fix a typo in nfs_inode_reclaim_delegation
  NFS: Add a boot parameter to disable 64 bit inode numbers
  NFS: nfs_refresh_inode should clear cache_validity flags on success
  NFS: Fix a connectathon regression in NFSv3 and NFSv4
  NFS: Use nfs_refresh_inode() in ops that aren't expected to change the inode
  SUNRPC: Don't call xprt_release in call refresh
  SUNRPC: Don't call xprt_release() if call_allocate fails
  SUNRPC: Fix buggy UDP transmission
  [23/37] Clean up duplicate includes in
  [2.6 patch] net/sunrpc/rpcb_clnt.c: make struct rpcb_program static
  SUNRPC: Use correct type in buffer length calculations
  SUNRPC: Fix default hostname created in rpc_create()
  nfs: add server port to rpc_pipe info file
  NFS: Get rid of some obsolete macros
  NFS: Simplify filehandle revalidation
  NFS: Ensure that nfs_link() returns a hashed dentry
  NFS: Be strict about dentry revalidation when doing exclusive create
  NFS: Don't zap the readdir caches upon error
  NFS: Remove the redundant nfs_reval_fsid()
  NFSv3: Always use directory post-op attributes in nfs3_proc_lookup
  ...

Fix up trivial conflict due to sock_owned_by_user() cleanup manually in
net/sunrpc/xprtsock.c
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 8ebfc4d..5c69a72 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -5,6 +5,7 @@
 
 obj-$(CONFIG_SUNRPC) += sunrpc.o
 obj-$(CONFIG_SUNRPC_GSS) += auth_gss/
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma/
 
 sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
 	    auth.o auth_null.o auth_unix.o \
diff --git a/net/sunrpc/auth_gss/gss_krb5_wrap.c b/net/sunrpc/auth_gss/gss_krb5_wrap.c
index 42b3220..8bd074d 100644
--- a/net/sunrpc/auth_gss/gss_krb5_wrap.c
+++ b/net/sunrpc/auth_gss/gss_krb5_wrap.c
@@ -42,7 +42,7 @@
 {
 	u8 *ptr;
 	u8 pad;
-	int len = buf->len;
+	size_t len = buf->len;
 
 	if (len <= buf->head[0].iov_len) {
 		pad = *(u8 *)(buf->head[0].iov_base + len - 1);
@@ -53,9 +53,9 @@
 	} else
 		len -= buf->head[0].iov_len;
 	if (len <= buf->page_len) {
-		int last = (buf->page_base + len - 1)
+		unsigned int last = (buf->page_base + len - 1)
 					>>PAGE_CACHE_SHIFT;
-		int offset = (buf->page_base + len - 1)
+		unsigned int offset = (buf->page_base + len - 1)
 					& (PAGE_CACHE_SIZE - 1);
 		ptr = kmap_atomic(buf->pages[last], KM_USER0);
 		pad = *(ptr + offset);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 52429b1..76be83e 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -127,7 +127,14 @@
 	struct rpc_clnt		*clnt = NULL;
 	struct rpc_auth		*auth;
 	int err;
-	int len;
+	size_t len;
+
+	/* sanity check the name before trying to print it */
+	err = -EINVAL;
+	len = strlen(servname);
+	if (len > RPC_MAXNETNAMELEN)
+		goto out_no_rpciod;
+	len++;
 
 	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
 			program->name, servname, xprt);
@@ -148,7 +155,6 @@
 	clnt->cl_parent = clnt;
 
 	clnt->cl_server = clnt->cl_inline_name;
-	len = strlen(servname) + 1;
 	if (len > sizeof(clnt->cl_inline_name)) {
 		char *buf = kmalloc(len, GFP_KERNEL);
 		if (buf != 0)
@@ -234,8 +240,8 @@
 {
 	struct rpc_xprt *xprt;
 	struct rpc_clnt *clnt;
-	struct rpc_xprtsock_create xprtargs = {
-		.proto = args->protocol,
+	struct xprt_create xprtargs = {
+		.ident = args->protocol,
 		.srcaddr = args->saddress,
 		.dstaddr = args->address,
 		.addrlen = args->addrsize,
@@ -253,7 +259,7 @@
 	 */
 	if (args->servername == NULL) {
 		struct sockaddr_in *addr =
-					(struct sockaddr_in *) &args->address;
+					(struct sockaddr_in *) args->address;
 		snprintf(servername, sizeof(servername), NIPQUAD_FMT,
 			NIPQUAD(addr->sin_addr.s_addr));
 		args->servername = servername;
@@ -269,9 +275,6 @@
 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
 		xprt->resvport = 0;
 
-	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
-			args->program->name, args->servername, xprt);
-
 	clnt = rpc_new_client(xprt, args->servername, args->program,
 				args->version, args->authflavor);
 	if (IS_ERR(clnt))
@@ -439,7 +442,7 @@
  */
 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
 				      struct rpc_program *program,
-				      int vers)
+				      u32 vers)
 {
 	struct rpc_clnt *clnt;
 	struct rpc_version *version;
@@ -843,8 +846,7 @@
 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
 
 	if (RPC_IS_ASYNC(task) || !signalled()) {
-		xprt_release(task);
-		task->tk_action = call_reserve;
+		task->tk_action = call_allocate;
 		rpc_delay(task, HZ>>4);
 		return;
 	}
@@ -871,6 +873,7 @@
 	buf->head[0].iov_len = len;
 	buf->tail[0].iov_len = 0;
 	buf->page_len = 0;
+	buf->flags = 0;
 	buf->len = 0;
 	buf->buflen = len;
 }
@@ -937,7 +940,7 @@
 static void
 call_bind_status(struct rpc_task *task)
 {
-	int status = -EACCES;
+	int status = -EIO;
 
 	if (task->tk_status >= 0) {
 		dprint_status(task);
@@ -947,9 +950,20 @@
 	}
 
 	switch (task->tk_status) {
+	case -EAGAIN:
+		dprintk("RPC: %5u rpcbind waiting for another request "
+				"to finish\n", task->tk_pid);
+		/* avoid busy-waiting here -- could be a network outage. */
+		rpc_delay(task, 5*HZ);
+		goto retry_timeout;
 	case -EACCES:
 		dprintk("RPC: %5u remote rpcbind: RPC program/version "
 				"unavailable\n", task->tk_pid);
+		/* fail immediately if this is an RPC ping */
+		if (task->tk_msg.rpc_proc->p_proc == 0) {
+			status = -EOPNOTSUPP;
+			break;
+		}
 		rpc_delay(task, 3*HZ);
 		goto retry_timeout;
 	case -ETIMEDOUT:
@@ -957,6 +971,7 @@
 				task->tk_pid);
 		goto retry_timeout;
 	case -EPFNOSUPPORT:
+		/* server doesn't support any rpcbind version we know of */
 		dprintk("RPC: %5u remote rpcbind service unavailable\n",
 				task->tk_pid);
 		break;
@@ -969,7 +984,6 @@
 	default:
 		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
 				task->tk_pid, -task->tk_status);
-		status = -EIO;
 	}
 
 	rpc_exit(task, status);
@@ -1257,7 +1271,6 @@
 {
 	dprint_status(task);
 
-	xprt_release(task);	/* Must do to obtain new XID */
 	task->tk_action = call_refreshresult;
 	task->tk_status = 0;
 	task->tk_client->cl_stats->rpcauthrefresh++;
@@ -1375,6 +1388,8 @@
 			dprintk("RPC: %5u %s: retry stale creds\n",
 					task->tk_pid, __FUNCTION__);
 			rpcauth_invalcred(task);
+			/* Ensure we obtain a new XID! */
+			xprt_release(task);
 			task->tk_action = call_refresh;
 			goto out_retry;
 		case RPC_AUTH_BADCRED:
@@ -1523,13 +1538,18 @@
 		spin_lock(&clnt->cl_lock);
 		list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
 			const char *rpc_waitq = "none";
+			int proc;
+
+			if (t->tk_msg.rpc_proc)
+				proc = t->tk_msg.rpc_proc->p_proc;
+			else
+				proc = -1;
 
 			if (RPC_IS_QUEUED(t))
 				rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
 
 			printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
-				t->tk_pid,
-				(t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
+				t->tk_pid, proc,
 				t->tk_flags, t->tk_status,
 				t->tk_client,
 				(t->tk_client ? t->tk_client->cl_prog : 0),
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 669e12a..c8433e8 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -14,7 +14,7 @@
 #include <linux/pagemap.h>
 #include <linux/mount.h>
 #include <linux/namei.h>
-#include <linux/dnotify.h>
+#include <linux/fsnotify.h>
 #include <linux/kernel.h>
 
 #include <asm/ioctls.h>
@@ -329,6 +329,7 @@
 			clnt->cl_prog, clnt->cl_vers);
 	seq_printf(m, "address: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
 	seq_printf(m, "protocol: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_PROTO));
+	seq_printf(m, "port: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_PORT));
 	return 0;
 }
 
@@ -585,6 +586,7 @@
 		if (S_ISDIR(mode))
 			inc_nlink(dir);
 		d_add(dentry, inode);
+		fsnotify_create(dir, dentry);
 	}
 	mutex_unlock(&dir->i_mutex);
 	return 0;
@@ -606,7 +608,7 @@
 	inode->i_ino = iunique(dir->i_sb, 100);
 	d_instantiate(dentry, inode);
 	inc_nlink(dir);
-	inode_dir_notify(dir, DN_CREATE);
+	fsnotify_mkdir(dir, dentry);
 	return 0;
 out_err:
 	printk(KERN_WARNING "%s: %s failed to allocate inode for dentry %s\n",
@@ -748,7 +750,7 @@
 	rpci->flags = flags;
 	rpci->ops = ops;
 	rpci->nkern_readwriters = 1;
-	inode_dir_notify(dir, DN_CREATE);
+	fsnotify_create(dir, dentry);
 	dget(dentry);
 out:
 	mutex_unlock(&dir->i_mutex);
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index d1740db..a05493a 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -16,11 +16,14 @@
 
 #include <linux/types.h>
 #include <linux/socket.h>
+#include <linux/in.h>
+#include <linux/in6.h>
 #include <linux/kernel.h>
 #include <linux/errno.h>
 
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/xprtsock.h>
 
 #ifdef RPC_DEBUG
 # define RPCDBG_FACILITY	RPCDBG_BIND
@@ -91,26 +94,6 @@
 #define RPCB_MAXADDRLEN		(128u)
 
 /*
- * r_netid
- *
- * Quoting RFC 3530, section 2.2:
- *
- * For TCP over IPv4 the value of r_netid is the string "tcp".  For UDP
- * over IPv4 the value of r_netid is the string "udp".
- *
- * ...
- *
- * For TCP over IPv6 the value of r_netid is the string "tcp6".  For UDP
- * over IPv6 the value of r_netid is the string "udp6".
- */
-#define RPCB_NETID_UDP	"\165\144\160"		/* "udp" */
-#define RPCB_NETID_TCP	"\164\143\160"		/* "tcp" */
-#define RPCB_NETID_UDP6	"\165\144\160\066"	/* "udp6" */
-#define RPCB_NETID_TCP6	"\164\143\160\066"	/* "tcp6" */
-
-#define RPCB_MAXNETIDLEN	(4u)
-
-/*
  * r_owner
  *
  * The "owner" is allowed to unset a service in the rpcbind database.
@@ -120,7 +103,7 @@
 #define RPCB_MAXOWNERLEN	sizeof(RPCB_OWNER_STRING)
 
 static void			rpcb_getport_done(struct rpc_task *, void *);
-extern struct rpc_program	rpcb_program;
+static struct rpc_program	rpcb_program;
 
 struct rpcbind_args {
 	struct rpc_xprt *	r_xprt;
@@ -137,10 +120,13 @@
 static struct rpc_procinfo rpcb_procedures2[];
 static struct rpc_procinfo rpcb_procedures3[];
 
-static struct rpcb_info {
+struct rpcb_info {
 	int			rpc_vers;
 	struct rpc_procinfo *	rpc_proc;
-} rpcb_next_version[];
+};
+
+static struct rpcb_info rpcb_next_version[];
+static struct rpcb_info rpcb_next_version6[];
 
 static void rpcb_getport_prepare(struct rpc_task *task, void *calldata)
 {
@@ -190,7 +176,17 @@
 				   RPC_CLNT_CREATE_INTR),
 	};
 
-	((struct sockaddr_in *)srvaddr)->sin_port = htons(RPCBIND_PORT);
+	switch (srvaddr->sa_family) {
+	case AF_INET:
+		((struct sockaddr_in *)srvaddr)->sin_port = htons(RPCBIND_PORT);
+		break;
+	case AF_INET6:
+		((struct sockaddr_in6 *)srvaddr)->sin6_port = htons(RPCBIND_PORT);
+		break;
+	default:
+		return NULL;
+	}
+
 	if (!privileged)
 		args.flags |= RPC_CLNT_CREATE_NONPRIVPORT;
 	return rpc_create(&args);
@@ -234,7 +230,7 @@
 			prog, vers, prot, port);
 
 	rpcb_clnt = rpcb_create("localhost", (struct sockaddr *) &sin,
-					IPPROTO_UDP, 2, 1);
+					XPRT_TRANSPORT_UDP, 2, 1);
 	if (IS_ERR(rpcb_clnt))
 		return PTR_ERR(rpcb_clnt);
 
@@ -316,6 +312,7 @@
 	struct rpc_task	*child;
 	struct sockaddr addr;
 	int status;
+	struct rpcb_info *info;
 
 	dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
 		task->tk_pid, __FUNCTION__,
@@ -325,7 +322,7 @@
 	BUG_ON(clnt->cl_parent != clnt);
 
 	if (xprt_test_and_set_binding(xprt)) {
-		status = -EACCES;		/* tell caller to check again */
+		status = -EAGAIN;	/* tell caller to check again */
 		dprintk("RPC: %5u %s: waiting for another binder\n",
 			task->tk_pid, __FUNCTION__);
 		goto bailout_nowake;
@@ -343,18 +340,43 @@
 		goto bailout_nofree;
 	}
 
-	if (rpcb_next_version[xprt->bind_index].rpc_proc == NULL) {
+	rpc_peeraddr(clnt, (void *)&addr, sizeof(addr));
+
+	/* Don't ever use rpcbind v2 for AF_INET6 requests */
+	switch (addr.sa_family) {
+	case AF_INET:
+		info = rpcb_next_version;
+		break;
+	case AF_INET6:
+		info = rpcb_next_version6;
+		break;
+	default:
+		status = -EAFNOSUPPORT;
+		dprintk("RPC: %5u %s: bad address family\n",
+				task->tk_pid, __FUNCTION__);
+		goto bailout_nofree;
+	}
+	if (info[xprt->bind_index].rpc_proc == NULL) {
 		xprt->bind_index = 0;
-		status = -EACCES;	/* tell caller to try again later */
+		status = -EPFNOSUPPORT;
 		dprintk("RPC: %5u %s: no more getport versions available\n",
 			task->tk_pid, __FUNCTION__);
 		goto bailout_nofree;
 	}
-	bind_version = rpcb_next_version[xprt->bind_index].rpc_vers;
+	bind_version = info[xprt->bind_index].rpc_vers;
 
 	dprintk("RPC: %5u %s: trying rpcbind version %u\n",
 		task->tk_pid, __FUNCTION__, bind_version);
 
+	rpcb_clnt = rpcb_create(clnt->cl_server, &addr, xprt->prot,
+				bind_version, 0);
+	if (IS_ERR(rpcb_clnt)) {
+		status = PTR_ERR(rpcb_clnt);
+		dprintk("RPC: %5u %s: rpcb_create failed, error %ld\n",
+			task->tk_pid, __FUNCTION__, PTR_ERR(rpcb_clnt));
+		goto bailout_nofree;
+	}
+
 	map = kzalloc(sizeof(struct rpcbind_args), GFP_ATOMIC);
 	if (!map) {
 		status = -ENOMEM;
@@ -367,28 +389,19 @@
 	map->r_prot = xprt->prot;
 	map->r_port = 0;
 	map->r_xprt = xprt_get(xprt);
-	map->r_netid = (xprt->prot == IPPROTO_TCP) ? RPCB_NETID_TCP :
-						   RPCB_NETID_UDP;
-	memcpy(&map->r_addr, rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR),
-			sizeof(map->r_addr));
+	map->r_netid = rpc_peeraddr2str(clnt, RPC_DISPLAY_NETID);
+	memcpy(map->r_addr,
+	       rpc_peeraddr2str(rpcb_clnt, RPC_DISPLAY_UNIVERSAL_ADDR),
+	       sizeof(map->r_addr));
 	map->r_owner = RPCB_OWNER_STRING;	/* ignored for GETADDR */
 
-	rpc_peeraddr(clnt, (void *)&addr, sizeof(addr));
-	rpcb_clnt = rpcb_create(clnt->cl_server, &addr, xprt->prot, bind_version, 0);
-	if (IS_ERR(rpcb_clnt)) {
-		status = PTR_ERR(rpcb_clnt);
-		dprintk("RPC: %5u %s: rpcb_create failed, error %ld\n",
-			task->tk_pid, __FUNCTION__, PTR_ERR(rpcb_clnt));
-		goto bailout;
-	}
-
 	child = rpc_run_task(rpcb_clnt, RPC_TASK_ASYNC, &rpcb_getport_ops, map);
 	rpc_release_client(rpcb_clnt);
 	if (IS_ERR(child)) {
 		status = -EIO;
 		dprintk("RPC: %5u %s: rpc_run_task failed\n",
 			task->tk_pid, __FUNCTION__);
-		goto bailout_nofree;
+		goto bailout;
 	}
 	rpc_put_task(child);
 
@@ -403,6 +416,7 @@
 bailout_nowake:
 	task->tk_status = status;
 }
+EXPORT_SYMBOL_GPL(rpcb_getport_async);
 
 /*
  * Rpcbind child task calls this callback via tk_exit.
@@ -413,6 +427,10 @@
 	struct rpc_xprt *xprt = map->r_xprt;
 	int status = child->tk_status;
 
+	/* Garbage reply: retry with a lesser rpcbind version */
+	if (status == -EIO)
+		status = -EPROTONOSUPPORT;
+
 	/* rpcbind server doesn't support this rpcbind protocol version */
 	if (status == -EPROTONOSUPPORT)
 		xprt->bind_index++;
@@ -490,16 +508,24 @@
 			       unsigned short *portp)
 {
 	char *addr;
-	int addr_len, c, i, f, first, val;
+	u32 addr_len;
+	int c, i, f, first, val;
 
 	*portp = 0;
-	addr_len = (unsigned int) ntohl(*p++);
-	if (addr_len > RPCB_MAXADDRLEN)			/* sanity */
-		return -EINVAL;
+	addr_len = ntohl(*p++);
 
-	dprintk("RPC:       rpcb_decode_getaddr returned string: '%s'\n",
-			(char *) p);
+	/*
+	 * Simple sanity check.  The smallest possible universal
+	 * address is an IPv4 address string containing 11 bytes.
+	 */
+	if (addr_len < 11 || addr_len > RPCB_MAXADDRLEN)
+		goto out_err;
 
+	/*
+	 * Start at the end and walk backwards until the first dot
+	 * is encountered.  When the second dot is found, we have
+	 * both parts of the port number.
+	 */
 	addr = (char *)p;
 	val = 0;
 	first = 1;
@@ -521,8 +547,19 @@
 		}
 	}
 
+	/*
+	 * Simple sanity check.  If we never saw a dot in the reply,
+	 * then this was probably just garbage.
+	 */
+	if (first)
+		goto out_err;
+
 	dprintk("RPC:       rpcb_decode_getaddr port=%u\n", *portp);
 	return 0;
+
+out_err:
+	dprintk("RPC:       rpcbind server returned malformed reply\n");
+	return -EIO;
 }
 
 #define RPCB_program_sz		(1u)
@@ -531,7 +568,7 @@
 #define RPCB_port_sz		(1u)
 #define RPCB_boolean_sz		(1u)
 
-#define RPCB_netid_sz		(1+XDR_QUADLEN(RPCB_MAXNETIDLEN))
+#define RPCB_netid_sz		(1+XDR_QUADLEN(RPCBIND_MAXNETIDLEN))
 #define RPCB_addr_sz		(1+XDR_QUADLEN(RPCB_MAXADDRLEN))
 #define RPCB_ownerstring_sz	(1+XDR_QUADLEN(RPCB_MAXOWNERLEN))
 
@@ -593,6 +630,14 @@
 	{ 0, NULL },
 };
 
+static struct rpcb_info rpcb_next_version6[] = {
+#ifdef CONFIG_SUNRPC_BIND34
+	{ 4, &rpcb_procedures4[RPCBPROC_GETVERSADDR] },
+	{ 3, &rpcb_procedures3[RPCBPROC_GETADDR] },
+#endif
+	{ 0, NULL },
+};
+
 static struct rpc_version rpcb_version2 = {
 	.number		= 2,
 	.nrprocs	= RPCB_HIGHPROC_2,
@@ -621,7 +666,7 @@
 
 static struct rpc_stat rpcb_stats;
 
-struct rpc_program rpcb_program = {
+static struct rpc_program rpcb_program = {
 	.name		= "rpcbind",
 	.number		= RPCBIND_PROGRAM,
 	.nrvers		= ARRAY_SIZE(rpcb_version),
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 954d7ec..3c773c5 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -777,6 +777,7 @@
 			task->tk_pid, size, buf);
 	return &buf->data;
 }
+EXPORT_SYMBOL_GPL(rpc_malloc);
 
 /**
  * rpc_free - free buffer allocated via rpc_malloc
@@ -802,6 +803,7 @@
 	else
 		kfree(buf);
 }
+EXPORT_SYMBOL_GPL(rpc_free);
 
 /*
  * Creation and deletion of RPC task structures
diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c
index 1d377d1..97ac45f0 100644
--- a/net/sunrpc/socklib.c
+++ b/net/sunrpc/socklib.c
@@ -34,6 +34,7 @@
 	desc->offset += len;
 	return len;
 }
+EXPORT_SYMBOL_GPL(xdr_skb_read_bits);
 
 /**
  * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer
@@ -137,6 +138,7 @@
 out:
 	return copied;
 }
+EXPORT_SYMBOL_GPL(xdr_partial_copy_from_skb);
 
 /**
  * csum_partial_copy_to_xdr - checksum and copy data
@@ -179,3 +181,4 @@
 		return -1;
 	return 0;
 }
+EXPORT_SYMBOL_GPL(csum_partial_copy_to_xdr);
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index 384c4ad..33d89e8 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -20,7 +20,7 @@
 #include <linux/sunrpc/auth.h>
 #include <linux/workqueue.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
-
+#include <linux/sunrpc/xprtsock.h>
 
 /* RPC scheduler */
 EXPORT_SYMBOL(rpc_execute);
diff --git a/net/sunrpc/timer.c b/net/sunrpc/timer.c
index 8142fdb..31becbf 100644
--- a/net/sunrpc/timer.c
+++ b/net/sunrpc/timer.c
@@ -17,6 +17,7 @@
 
 #include <linux/types.h>
 #include <linux/unistd.h>
+#include <linux/module.h>
 
 #include <linux/sunrpc/clnt.h>
 
@@ -40,6 +41,7 @@
 		rt->ntimeouts[i] = 0;
 	}
 }
+EXPORT_SYMBOL_GPL(rpc_init_rtt);
 
 /*
  * NB: When computing the smoothed RTT and standard deviation,
@@ -75,6 +77,7 @@
 	if (*sdrtt < RPC_RTO_MIN)
 		*sdrtt = RPC_RTO_MIN;
 }
+EXPORT_SYMBOL_GPL(rpc_update_rtt);
 
 /*
  * Estimate rto for an nfs rpc sent via. an unreliable datagram.
@@ -103,3 +106,4 @@
 
 	return res;
 }
+EXPORT_SYMBOL_GPL(rpc_calc_rto);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index c8c2edc..282a9a2 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -62,6 +62,9 @@
 static void	xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
 
+static spinlock_t xprt_list_lock = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(xprt_list);
+
 /*
  * The transport code maintains an estimate on the maximum number of out-
  * standing RPC requests, using a smoothed version of the congestion
@@ -81,6 +84,78 @@
 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
 
 /**
+ * xprt_register_transport - register a transport implementation
+ * @transport: transport to register
+ *
+ * If a transport implementation is loaded as a kernel module, it can
+ * call this interface to make itself known to the RPC client.
+ *
+ * Returns:
+ * 0:		transport successfully registered
+ * -EEXIST:	transport already registered
+ * -EINVAL:	transport module being unloaded
+ */
+int xprt_register_transport(struct xprt_class *transport)
+{
+	struct xprt_class *t;
+	int result;
+
+	result = -EEXIST;
+	spin_lock(&xprt_list_lock);
+	list_for_each_entry(t, &xprt_list, list) {
+		/* don't register the same transport class twice */
+		if (t->ident == transport->ident)
+			goto out;
+	}
+
+	result = -EINVAL;
+	if (try_module_get(THIS_MODULE)) {
+		list_add_tail(&transport->list, &xprt_list);
+		printk(KERN_INFO "RPC: Registered %s transport module.\n",
+			transport->name);
+		result = 0;
+	}
+
+out:
+	spin_unlock(&xprt_list_lock);
+	return result;
+}
+EXPORT_SYMBOL_GPL(xprt_register_transport);
+
+/**
+ * xprt_unregister_transport - unregister a transport implementation
+ * transport: transport to unregister
+ *
+ * Returns:
+ * 0:		transport successfully unregistered
+ * -ENOENT:	transport never registered
+ */
+int xprt_unregister_transport(struct xprt_class *transport)
+{
+	struct xprt_class *t;
+	int result;
+
+	result = 0;
+	spin_lock(&xprt_list_lock);
+	list_for_each_entry(t, &xprt_list, list) {
+		if (t == transport) {
+			printk(KERN_INFO
+				"RPC: Unregistered %s transport module.\n",
+				transport->name);
+			list_del_init(&transport->list);
+			module_put(THIS_MODULE);
+			goto out;
+		}
+	}
+	result = -ENOENT;
+
+out:
+	spin_unlock(&xprt_list_lock);
+	return result;
+}
+EXPORT_SYMBOL_GPL(xprt_unregister_transport);
+
+/**
  * xprt_reserve_xprt - serialize write access to transports
  * @task: task that is requesting access to the transport
  *
@@ -118,6 +193,7 @@
 		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
 	return 0;
 }
+EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
 
 static void xprt_clear_locked(struct rpc_xprt *xprt)
 {
@@ -167,6 +243,7 @@
 		rpc_sleep_on(&xprt->sending, task, NULL, NULL);
 	return 0;
 }
+EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
 
 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
@@ -246,6 +323,7 @@
 		__xprt_lock_write_next(xprt);
 	}
 }
+EXPORT_SYMBOL_GPL(xprt_release_xprt);
 
 /**
  * xprt_release_xprt_cong - allow other requests to use a transport
@@ -262,6 +340,7 @@
 		__xprt_lock_write_next_cong(xprt);
 	}
 }
+EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
 
 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
@@ -314,6 +393,7 @@
 {
 	__xprt_put_cong(task->tk_xprt, task->tk_rqstp);
 }
+EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
 
 /**
  * xprt_adjust_cwnd - adjust transport congestion window
@@ -345,6 +425,7 @@
 	xprt->cwnd = cwnd;
 	__xprt_put_cong(xprt, req);
 }
+EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
 
 /**
  * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
@@ -359,6 +440,7 @@
 	else
 		rpc_wake_up(&xprt->pending);
 }
+EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
 
 /**
  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
@@ -373,6 +455,7 @@
 	task->tk_timeout = req->rq_timeout;
 	rpc_sleep_on(&xprt->pending, task, NULL, NULL);
 }
+EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
 
 /**
  * xprt_write_space - wake the task waiting for transport output buffer space
@@ -393,6 +476,7 @@
 	}
 	spin_unlock_bh(&xprt->transport_lock);
 }
+EXPORT_SYMBOL_GPL(xprt_write_space);
 
 /**
  * xprt_set_retrans_timeout_def - set a request's retransmit timeout
@@ -406,6 +490,7 @@
 {
 	task->tk_timeout = task->tk_rqstp->rq_timeout;
 }
+EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
 
 /*
  * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
@@ -425,6 +510,7 @@
 	if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
 		task->tk_timeout = max_timeout;
 }
+EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
 
 static void xprt_reset_majortimeo(struct rpc_rqst *req)
 {
@@ -500,6 +586,7 @@
 	xprt_wake_pending_tasks(xprt, -ENOTCONN);
 	spin_unlock_bh(&xprt->transport_lock);
 }
+EXPORT_SYMBOL_GPL(xprt_disconnect);
 
 static void
 xprt_init_autodisconnect(unsigned long data)
@@ -610,6 +697,7 @@
 	xprt->stat.bad_xids++;
 	return NULL;
 }
+EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
 /**
  * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
@@ -629,6 +717,7 @@
 		rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
 	}
 }
+EXPORT_SYMBOL_GPL(xprt_update_rtt);
 
 /**
  * xprt_complete_rqst - called when reply processing is complete
@@ -653,6 +742,7 @@
 	req->rq_received = req->rq_private_buf.len = copied;
 	rpc_wake_up_task(task);
 }
+EXPORT_SYMBOL_GPL(xprt_complete_rqst);
 
 static void xprt_timer(struct rpc_task *task)
 {
@@ -889,23 +979,25 @@
  * @args: rpc transport creation arguments
  *
  */
-struct rpc_xprt *xprt_create_transport(struct rpc_xprtsock_create *args)
+struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
 {
 	struct rpc_xprt	*xprt;
 	struct rpc_rqst	*req;
+	struct xprt_class *t;
 
-	switch (args->proto) {
-	case IPPROTO_UDP:
-		xprt = xs_setup_udp(args);
-		break;
-	case IPPROTO_TCP:
-		xprt = xs_setup_tcp(args);
-		break;
-	default:
-		printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
-				args->proto);
-		return ERR_PTR(-EIO);
+	spin_lock(&xprt_list_lock);
+	list_for_each_entry(t, &xprt_list, list) {
+		if (t->ident == args->ident) {
+			spin_unlock(&xprt_list_lock);
+			goto found;
+		}
 	}
+	spin_unlock(&xprt_list_lock);
+	printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
+	return ERR_PTR(-EIO);
+
+found:
+	xprt = t->setup(args);
 	if (IS_ERR(xprt)) {
 		dprintk("RPC:       xprt_create_transport: failed, %ld\n",
 				-PTR_ERR(xprt));
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
new file mode 100644
index 0000000..264f0fe
--- /dev/null
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -0,0 +1,3 @@
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o
+
+xprtrdma-y := transport.o rpc_rdma.o verbs.o
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
new file mode 100644
index 0000000..12db635
--- /dev/null
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -0,0 +1,868 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rpc_rdma.c
+ *
+ * This file contains the guts of the RPC RDMA protocol, and
+ * does marshaling/unmarshaling, etc. It is also where interfacing
+ * to the Linux RPC framework lives.
+ */
+
+#include "xprt_rdma.h"
+
+#include <linux/highmem.h>
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY	RPCDBG_TRANS
+#endif
+
+enum rpcrdma_chunktype {
+	rpcrdma_noch = 0,
+	rpcrdma_readch,
+	rpcrdma_areadch,
+	rpcrdma_writech,
+	rpcrdma_replych
+};
+
+#ifdef RPC_DEBUG
+static const char transfertypes[][12] = {
+	"pure inline",	/* no chunks */
+	" read chunk",	/* some argument via rdma read */
+	"*read chunk",	/* entire request via rdma read */
+	"write chunk",	/* some result via rdma write */
+	"reply chunk"	/* entire reply via rdma write */
+};
+#endif
+
+/*
+ * Chunk assembly from upper layer xdr_buf.
+ *
+ * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
+ * elements. Segments are then coalesced when registered, if possible
+ * within the selected memreg mode.
+ *
+ * Note, this routine is never called if the connection's memory
+ * registration strategy is 0 (bounce buffers).
+ */
+
+static int
+rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int pos,
+	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+{
+	int len, n = 0, p;
+
+	if (pos == 0 && xdrbuf->head[0].iov_len) {
+		seg[n].mr_page = NULL;
+		seg[n].mr_offset = xdrbuf->head[0].iov_base;
+		seg[n].mr_len = xdrbuf->head[0].iov_len;
+		pos += xdrbuf->head[0].iov_len;
+		++n;
+	}
+
+	if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {
+		if (n == nsegs)
+			return 0;
+		seg[n].mr_page = xdrbuf->pages[0];
+		seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;
+		seg[n].mr_len = min_t(u32,
+			PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len);
+		len = xdrbuf->page_len - seg[n].mr_len;
+		pos += len;
+		++n;
+		p = 1;
+		while (len > 0) {
+			if (n == nsegs)
+				return 0;
+			seg[n].mr_page = xdrbuf->pages[p];
+			seg[n].mr_offset = NULL;
+			seg[n].mr_len = min_t(u32, PAGE_SIZE, len);
+			len -= seg[n].mr_len;
+			++n;
+			++p;
+		}
+	}
+
+	if (pos < xdrbuf->len && xdrbuf->tail[0].iov_len) {
+		if (n == nsegs)
+			return 0;
+		seg[n].mr_page = NULL;
+		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
+		seg[n].mr_len = xdrbuf->tail[0].iov_len;
+		pos += xdrbuf->tail[0].iov_len;
+		++n;
+	}
+
+	if (pos < xdrbuf->len)
+		dprintk("RPC:       %s: marshaled only %d of %d\n",
+				__func__, pos, xdrbuf->len);
+
+	return n;
+}
+
+/*
+ * Create read/write chunk lists, and reply chunks, for RDMA
+ *
+ *   Assume check against THRESHOLD has been done, and chunks are required.
+ *   Assume only encoding one list entry for read|write chunks. The NFSv3
+ *     protocol is simple enough to allow this as it only has a single "bulk
+ *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
+ *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
+ *
+ * When used for a single reply chunk (which is a special write
+ * chunk used for the entire reply, rather than just the data), it
+ * is used primarily for READDIR and READLINK which would otherwise
+ * be severely size-limited by a small rdma inline read max. The server
+ * response will come back as an RDMA Write, followed by a message
+ * of type RDMA_NOMSG carrying the xid and length. As a result, reply
+ * chunks do not provide data alignment, however they do not require
+ * "fixup" (moving the response to the upper layer buffer) either.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ *  Read chunklist (a linked list):
+ *   N elements, position P (same P for all chunks of same arg!):
+ *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ *  Write chunklist (a list of (one) counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ *  Reply chunk (a counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO
+ */
+
+static unsigned int
+rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
+		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+{
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);
+	int nsegs, nchunks = 0;
+	int pos;
+	struct rpcrdma_mr_seg *seg = req->rl_segments;
+	struct rpcrdma_read_chunk *cur_rchunk = NULL;
+	struct rpcrdma_write_array *warray = NULL;
+	struct rpcrdma_write_chunk *cur_wchunk = NULL;
+	u32 *iptr = headerp->rm_body.rm_chunks;
+
+	if (type == rpcrdma_readch || type == rpcrdma_areadch) {
+		/* a read chunk - server will RDMA Read our memory */
+		cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
+	} else {
+		/* a write or reply chunk - server will RDMA Write our memory */
+		*iptr++ = xdr_zero;	/* encode a NULL read chunk list */
+		if (type == rpcrdma_replych)
+			*iptr++ = xdr_zero;	/* a NULL write chunk list */
+		warray = (struct rpcrdma_write_array *) iptr;
+		cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
+	}
+
+	if (type == rpcrdma_replych || type == rpcrdma_areadch)
+		pos = 0;
+	else
+		pos = target->head[0].iov_len;
+
+	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+	if (nsegs == 0)
+		return 0;
+
+	do {
+		/* bind/register the memory, then build chunk from result. */
+		int n = rpcrdma_register_external(seg, nsegs,
+						cur_wchunk != NULL, r_xprt);
+		if (n <= 0)
+			goto out;
+		if (cur_rchunk) {	/* read */
+			cur_rchunk->rc_discrim = xdr_one;
+			/* all read chunks have the same "position" */
+			cur_rchunk->rc_position = htonl(pos);
+			cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
+			cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+			xdr_encode_hyper(
+					(u32 *)&cur_rchunk->rc_target.rs_offset,
+					seg->mr_base);
+			dprintk("RPC:       %s: read chunk "
+				"elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,
+				seg->mr_len, seg->mr_base, seg->mr_rkey, pos,
+				n < nsegs ? "more" : "last");
+			cur_rchunk++;
+			r_xprt->rx_stats.read_chunk_count++;
+		} else {		/* write/reply */
+			cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
+			cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+			xdr_encode_hyper(
+					(u32 *)&cur_wchunk->wc_target.rs_offset,
+					seg->mr_base);
+			dprintk("RPC:       %s: %s chunk "
+				"elem %d@0x%llx:0x%x (%s)\n", __func__,
+				(type == rpcrdma_replych) ? "reply" : "write",
+				seg->mr_len, seg->mr_base, seg->mr_rkey,
+				n < nsegs ? "more" : "last");
+			cur_wchunk++;
+			if (type == rpcrdma_replych)
+				r_xprt->rx_stats.reply_chunk_count++;
+			else
+				r_xprt->rx_stats.write_chunk_count++;
+			r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+		}
+		nchunks++;
+		seg   += n;
+		nsegs -= n;
+	} while (nsegs);
+
+	/* success. all failures return above */
+	req->rl_nchunks = nchunks;
+
+	BUG_ON(nchunks == 0);
+
+	/*
+	 * finish off header. If write, marshal discrim and nchunks.
+	 */
+	if (cur_rchunk) {
+		iptr = (u32 *) cur_rchunk;
+		*iptr++ = xdr_zero;	/* finish the read chunk list */
+		*iptr++ = xdr_zero;	/* encode a NULL write chunk list */
+		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
+	} else {
+		warray->wc_discrim = xdr_one;
+		warray->wc_nchunks = htonl(nchunks);
+		iptr = (u32 *) cur_wchunk;
+		if (type == rpcrdma_writech) {
+			*iptr++ = xdr_zero; /* finish the write chunk list */
+			*iptr++ = xdr_zero; /* encode a NULL reply chunk */
+		}
+	}
+
+	/*
+	 * Return header size.
+	 */
+	return (unsigned char *)iptr - (unsigned char *)headerp;
+
+out:
+	for (pos = 0; nchunks--;)
+		pos += rpcrdma_deregister_external(
+				&req->rl_segments[pos], r_xprt, NULL);
+	return 0;
+}
+
+/*
+ * Copy write data inline.
+ * This function is used for "small" requests. Data which is passed
+ * to RPC via iovecs (or page list) is copied directly into the
+ * pre-registered memory buffer for this request. For small amounts
+ * of data, this is efficient. The cutoff value is tunable.
+ */
+static int
+rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+{
+	int i, npages, curlen;
+	int copy_len;
+	unsigned char *srcp, *destp;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+
+	destp = rqst->rq_svec[0].iov_base;
+	curlen = rqst->rq_svec[0].iov_len;
+	destp += curlen;
+	/*
+	 * Do optional padding where it makes sense. Alignment of write
+	 * payload can help the server, if our setting is accurate.
+	 */
+	pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
+	if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
+		pad = 0;	/* don't pad this request */
+
+	dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d\n",
+		__func__, pad, destp, rqst->rq_slen, curlen);
+
+	copy_len = rqst->rq_snd_buf.page_len;
+	r_xprt->rx_stats.pullup_copy_count += copy_len;
+	npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT;
+	for (i = 0; copy_len && i < npages; i++) {
+		if (i == 0)
+			curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;
+		else
+			curlen = PAGE_SIZE;
+		if (curlen > copy_len)
+			curlen = copy_len;
+		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
+			__func__, i, destp, copy_len, curlen);
+		srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],
+					KM_SKB_SUNRPC_DATA);
+		if (i == 0)
+			memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);
+		else
+			memcpy(destp, srcp, curlen);
+		kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);
+		rqst->rq_svec[0].iov_len += curlen;
+		destp += curlen;
+		copy_len -= curlen;
+	}
+	if (rqst->rq_snd_buf.tail[0].iov_len) {
+		curlen = rqst->rq_snd_buf.tail[0].iov_len;
+		if (destp != rqst->rq_snd_buf.tail[0].iov_base) {
+			memcpy(destp,
+				rqst->rq_snd_buf.tail[0].iov_base, curlen);
+			r_xprt->rx_stats.pullup_copy_count += curlen;
+		}
+		dprintk("RPC:       %s: tail destp 0x%p len %d curlen %d\n",
+			__func__, destp, copy_len, curlen);
+		rqst->rq_svec[0].iov_len += curlen;
+	}
+	/* header now contains entire send message */
+	return pad;
+}
+
+/*
+ * Marshal a request: the primary job of this routine is to choose
+ * the transfer modes. See comments below.
+ *
+ * Uses multiple RDMA IOVs for a request:
+ *  [0] -- RPC RDMA header, which uses memory from the *start* of the
+ *         preregistered buffer that already holds the RPC data in
+ *         its middle.
+ *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
+ *  [2] -- optional padding.
+ *  [3] -- if padded, header only in [1] and data here.
+ */
+
+int
+rpcrdma_marshal_req(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	char *base;
+	size_t hdrlen, rpclen, padlen;
+	enum rpcrdma_chunktype rtype, wtype;
+	struct rpcrdma_msg *headerp;
+
+	/*
+	 * rpclen gets amount of data in first buffer, which is the
+	 * pre-registered buffer.
+	 */
+	base = rqst->rq_svec[0].iov_base;
+	rpclen = rqst->rq_svec[0].iov_len;
+
+	/* build RDMA header in private area at front */
+	headerp = (struct rpcrdma_msg *) req->rl_base;
+	/* don't htonl XID, it's already done in request */
+	headerp->rm_xid = rqst->rq_xid;
+	headerp->rm_vers = xdr_one;
+	headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
+	headerp->rm_type = __constant_htonl(RDMA_MSG);
+
+	/*
+	 * Chunks needed for results?
+	 *
+	 * o If the expected result is under the inline threshold, all ops
+	 *   return as inline (but see later).
+	 * o Large non-read ops return as a single reply chunk.
+	 * o Large read ops return data as write chunk(s), header as inline.
+	 *
+	 * Note: the NFS code sending down multiple result segments implies
+	 * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
+	 */
+
+	/*
+	 * This code can handle read chunks, write chunks OR reply
+	 * chunks -- only one type. If the request is too big to fit
+	 * inline, then we will choose read chunks. If the request is
+	 * a READ, then use write chunks to separate the file data
+	 * into pages; otherwise use reply chunks.
+	 */
+	if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
+		wtype = rpcrdma_noch;
+	else if (rqst->rq_rcv_buf.page_len == 0)
+		wtype = rpcrdma_replych;
+	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+		wtype = rpcrdma_writech;
+	else
+		wtype = rpcrdma_replych;
+
+	/*
+	 * Chunks needed for arguments?
+	 *
+	 * o If the total request is under the inline threshold, all ops
+	 *   are sent as inline.
+	 * o Large non-write ops are sent with the entire message as a
+	 *   single read chunk (protocol 0-position special case).
+	 * o Large write ops transmit data as read chunk(s), header as
+	 *   inline.
+	 *
+	 * Note: the NFS code sending down multiple argument segments
+	 * implies the op is a write.
+	 * TBD check NFSv4 setacl
+	 */
+	if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+		rtype = rpcrdma_noch;
+	else if (rqst->rq_snd_buf.page_len == 0)
+		rtype = rpcrdma_areadch;
+	else
+		rtype = rpcrdma_readch;
+
+	/* The following simplification is not true forever */
+	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
+		wtype = rpcrdma_noch;
+	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
+
+	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
+	    (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
+		/* forced to "pure inline"? */
+		dprintk("RPC:       %s: too much data (%d/%d) for inline\n",
+			__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
+		return -1;
+	}
+
+	hdrlen = 28; /*sizeof *headerp;*/
+	padlen = 0;
+
+	/*
+	 * Pull up any extra send data into the preregistered buffer.
+	 * When padding is in use and applies to the transfer, insert
+	 * it and change the message type.
+	 */
+	if (rtype == rpcrdma_noch) {
+
+		padlen = rpcrdma_inline_pullup(rqst,
+						RPCRDMA_INLINE_PAD_VALUE(rqst));
+
+		if (padlen) {
+			headerp->rm_type = __constant_htonl(RDMA_MSGP);
+			headerp->rm_body.rm_padded.rm_align =
+				htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+			headerp->rm_body.rm_padded.rm_thresh =
+				__constant_htonl(RPCRDMA_INLINE_PAD_THRESH);
+			headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
+			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
+			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
+			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
+			BUG_ON(wtype != rpcrdma_noch);
+
+		} else {
+			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+			headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+			/* new length after pullup */
+			rpclen = rqst->rq_svec[0].iov_len;
+			/*
+			 * Currently we try to not actually use read inline.
+			 * Reply chunks have the desirable property that
+			 * they land, packed, directly in the target buffers
+			 * without headers, so they require no fixup. The
+			 * additional RDMA Write op sends the same amount
+			 * of data, streams on-the-wire and adds no overhead
+			 * on receive. Therefore, we request a reply chunk
+			 * for non-writes wherever feasible and efficient.
+			 */
+			if (wtype == rpcrdma_noch &&
+			    r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
+				wtype = rpcrdma_replych;
+		}
+	}
+
+	/*
+	 * Marshal chunks. This routine will return the header length
+	 * consumed by marshaling.
+	 */
+	if (rtype != rpcrdma_noch) {
+		hdrlen = rpcrdma_create_chunks(rqst,
+					&rqst->rq_snd_buf, headerp, rtype);
+		wtype = rtype;	/* simplify dprintk */
+
+	} else if (wtype != rpcrdma_noch) {
+		hdrlen = rpcrdma_create_chunks(rqst,
+					&rqst->rq_rcv_buf, headerp, wtype);
+	}
+
+	if (hdrlen == 0)
+		return -1;
+
+	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
+		"                   headerp 0x%p base 0x%p lkey 0x%x\n",
+		__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+		headerp, base, req->rl_iov.lkey);
+
+	/*
+	 * initialize send_iov's - normally only two: rdma chunk header and
+	 * single preregistered RPC header buffer, but if padding is present,
+	 * then use a preregistered (and zeroed) pad buffer between the RPC
+	 * header and any write data. In all non-rdma cases, any following
+	 * data has been copied into the RPC header buffer.
+	 */
+	req->rl_send_iov[0].addr = req->rl_iov.addr;
+	req->rl_send_iov[0].length = hdrlen;
+	req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+
+	req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+	req->rl_send_iov[1].length = rpclen;
+	req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+
+	req->rl_niovs = 2;
+
+	if (padlen) {
+		struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+		req->rl_send_iov[2].addr = ep->rep_pad.addr;
+		req->rl_send_iov[2].length = padlen;
+		req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+
+		req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
+		req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
+		req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+
+		req->rl_niovs = 4;
+	}
+
+	return 0;
+}
+
+/*
+ * Chase down a received write or reply chunklist to get length
+ * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
+ */
+static int
+rpcrdma_count_chunks(struct rpcrdma_rep *rep, int max, int wrchunk, u32 **iptrp)
+{
+	unsigned int i, total_len;
+	struct rpcrdma_write_chunk *cur_wchunk;
+
+	i = ntohl(**iptrp);	/* get array count */
+	if (i > max)
+		return -1;
+	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
+	total_len = 0;
+	while (i--) {
+		struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
+		ifdebug(FACILITY) {
+			u64 off;
+			xdr_decode_hyper((u32 *)&seg->rs_offset, &off);
+			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
+				__func__,
+				ntohl(seg->rs_length),
+				off,
+				ntohl(seg->rs_handle));
+		}
+		total_len += ntohl(seg->rs_length);
+		++cur_wchunk;
+	}
+	/* check and adjust for properly terminated write chunk */
+	if (wrchunk) {
+		u32 *w = (u32 *) cur_wchunk;
+		if (*w++ != xdr_zero)
+			return -1;
+		cur_wchunk = (struct rpcrdma_write_chunk *) w;
+	}
+	if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+		return -1;
+
+	*iptrp = (u32 *) cur_wchunk;
+	return total_len;
+}
+
+/*
+ * Scatter inline received data back into provided iov's.
+ */
+static void
+rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
+{
+	int i, npages, curlen, olen;
+	char *destp;
+
+	curlen = rqst->rq_rcv_buf.head[0].iov_len;
+	if (curlen > copy_len) {	/* write chunk header fixup */
+		curlen = copy_len;
+		rqst->rq_rcv_buf.head[0].iov_len = curlen;
+	}
+
+	dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
+		__func__, srcp, copy_len, curlen);
+
+	/* Shift pointer for first receive segment only */
+	rqst->rq_rcv_buf.head[0].iov_base = srcp;
+	srcp += curlen;
+	copy_len -= curlen;
+
+	olen = copy_len;
+	i = 0;
+	rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
+	if (copy_len && rqst->rq_rcv_buf.page_len) {
+		npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base +
+			rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
+		for (; i < npages; i++) {
+			if (i == 0)
+				curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base;
+			else
+				curlen = PAGE_SIZE;
+			if (curlen > copy_len)
+				curlen = copy_len;
+			dprintk("RPC:       %s: page %d"
+				" srcp 0x%p len %d curlen %d\n",
+				__func__, i, srcp, copy_len, curlen);
+			destp = kmap_atomic(rqst->rq_rcv_buf.pages[i],
+						KM_SKB_SUNRPC_DATA);
+			if (i == 0)
+				memcpy(destp + rqst->rq_rcv_buf.page_base,
+						srcp, curlen);
+			else
+				memcpy(destp, srcp, curlen);
+			flush_dcache_page(rqst->rq_rcv_buf.pages[i]);
+			kunmap_atomic(destp, KM_SKB_SUNRPC_DATA);
+			srcp += curlen;
+			copy_len -= curlen;
+			if (copy_len == 0)
+				break;
+		}
+		rqst->rq_rcv_buf.page_len = olen - copy_len;
+	} else
+		rqst->rq_rcv_buf.page_len = 0;
+
+	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
+		curlen = copy_len;
+		if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
+			curlen = rqst->rq_rcv_buf.tail[0].iov_len;
+		if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
+			memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
+		dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
+			__func__, srcp, copy_len, curlen);
+		rqst->rq_rcv_buf.tail[0].iov_len = curlen;
+		copy_len -= curlen; ++i;
+	} else
+		rqst->rq_rcv_buf.tail[0].iov_len = 0;
+
+	if (copy_len)
+		dprintk("RPC:       %s: %d bytes in"
+			" %d extra segments (%d lost)\n",
+			__func__, olen, i, copy_len);
+
+	/* TBD avoid a warning from call_decode() */
+	rqst->rq_private_buf = rqst->rq_rcv_buf;
+}
+
+/*
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
+ */
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
+{
+	struct rpc_xprt *xprt = ep->rep_xprt;
+
+	spin_lock_bh(&xprt->transport_lock);
+	if (ep->rep_connected > 0) {
+		if (!xprt_test_and_set_connected(xprt))
+			xprt_wake_pending_tasks(xprt, 0);
+	} else {
+		if (xprt_test_and_clear_connected(xprt))
+			xprt_wake_pending_tasks(xprt, ep->rep_connected);
+	}
+	spin_unlock_bh(&xprt->transport_lock);
+}
+
+/*
+ * This function is called when memory window unbind which we are waiting
+ * for completes. Just use rr_func (zeroed by upcall) to signal completion.
+ */
+static void
+rpcrdma_unbind_func(struct rpcrdma_rep *rep)
+{
+	wake_up(&rep->rr_unbind);
+}
+
+/*
+ * Called as a tasklet to do req/reply match and complete a request
+ * Errors must result in the RPC task either being awakened, or
+ * allowed to timeout, to discover the errors at that time.
+ */
+void
+rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+{
+	struct rpcrdma_msg *headerp;
+	struct rpcrdma_req *req;
+	struct rpc_rqst *rqst;
+	struct rpc_xprt *xprt = rep->rr_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	u32 *iptr;
+	int i, rdmalen, status;
+
+	/* Check status. If bad, signal disconnect and return rep to pool */
+	if (rep->rr_len == ~0U) {
+		rpcrdma_recv_buffer_put(rep);
+		if (r_xprt->rx_ep.rep_connected == 1) {
+			r_xprt->rx_ep.rep_connected = -EIO;
+			rpcrdma_conn_func(&r_xprt->rx_ep);
+		}
+		return;
+	}
+	if (rep->rr_len < 28) {
+		dprintk("RPC:       %s: short/invalid reply\n", __func__);
+		goto repost;
+	}
+	headerp = (struct rpcrdma_msg *) rep->rr_base;
+	if (headerp->rm_vers != xdr_one) {
+		dprintk("RPC:       %s: invalid version %d\n",
+			__func__, ntohl(headerp->rm_vers));
+		goto repost;
+	}
+
+	/* Get XID and try for a match. */
+	spin_lock(&xprt->transport_lock);
+	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	if (rqst == NULL) {
+		spin_unlock(&xprt->transport_lock);
+		dprintk("RPC:       %s: reply 0x%p failed "
+			"to match any request xid 0x%08x len %d\n",
+			__func__, rep, headerp->rm_xid, rep->rr_len);
+repost:
+		r_xprt->rx_stats.bad_reply_count++;
+		rep->rr_func = rpcrdma_reply_handler;
+		if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+			rpcrdma_recv_buffer_put(rep);
+
+		return;
+	}
+
+	/* get request object */
+	req = rpcr_to_rdmar(rqst);
+
+	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
+		"                   RPC request 0x%p xid 0x%08x\n",
+			__func__, rep, req, rqst, headerp->rm_xid);
+
+	BUG_ON(!req || req->rl_reply);
+
+	/* from here on, the reply is no longer an orphan */
+	req->rl_reply = rep;
+
+	/* check for expected message types */
+	/* The order of some of these tests is important. */
+	switch (headerp->rm_type) {
+	case __constant_htonl(RDMA_MSG):
+		/* never expect read chunks */
+		/* never expect reply chunks (two ways to check) */
+		/* never expect write chunks without having offered RDMA */
+		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
+		     headerp->rm_body.rm_chunks[2] != xdr_zero) ||
+		    (headerp->rm_body.rm_chunks[1] != xdr_zero &&
+		     req->rl_nchunks == 0))
+			goto badheader;
+		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
+			/* count any expected write chunks in read reply */
+			/* start at write chunk array count */
+			iptr = &headerp->rm_body.rm_chunks[2];
+			rdmalen = rpcrdma_count_chunks(rep,
+						req->rl_nchunks, 1, &iptr);
+			/* check for validity, and no reply chunk after */
+			if (rdmalen < 0 || *iptr++ != xdr_zero)
+				goto badheader;
+			rep->rr_len -=
+			    ((unsigned char *)iptr - (unsigned char *)headerp);
+			status = rep->rr_len + rdmalen;
+			r_xprt->rx_stats.total_rdma_reply += rdmalen;
+		} else {
+			/* else ordinary inline */
+			iptr = (u32 *)((unsigned char *)headerp + 28);
+			rep->rr_len -= 28; /*sizeof *headerp;*/
+			status = rep->rr_len;
+		}
+		/* Fix up the rpc results for upper layer */
+		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
+		break;
+
+	case __constant_htonl(RDMA_NOMSG):
+		/* never expect read or write chunks, always reply chunks */
+		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
+		    headerp->rm_body.rm_chunks[2] != xdr_one ||
+		    req->rl_nchunks == 0)
+			goto badheader;
+		iptr = (u32 *)((unsigned char *)headerp + 28);
+		rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
+		if (rdmalen < 0)
+			goto badheader;
+		r_xprt->rx_stats.total_rdma_reply += rdmalen;
+		/* Reply chunk buffer already is the reply vector - no fixup. */
+		status = rdmalen;
+		break;
+
+badheader:
+	default:
+		dprintk("%s: invalid rpcrdma reply header (type %d):"
+				" chunks[012] == %d %d %d"
+				" expected chunks <= %d\n",
+				__func__, ntohl(headerp->rm_type),
+				headerp->rm_body.rm_chunks[0],
+				headerp->rm_body.rm_chunks[1],
+				headerp->rm_body.rm_chunks[2],
+				req->rl_nchunks);
+		status = -EIO;
+		r_xprt->rx_stats.bad_reply_count++;
+		break;
+	}
+
+	/* If using mw bind, start the deregister process now. */
+	/* (Note: if mr_free(), cannot perform it here, in tasklet context) */
+	if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
+	case RPCRDMA_MEMWINDOWS:
+		for (i = 0; req->rl_nchunks-- > 1;)
+			i += rpcrdma_deregister_external(
+				&req->rl_segments[i], r_xprt, NULL);
+		/* Optionally wait (not here) for unbinds to complete */
+		rep->rr_func = rpcrdma_unbind_func;
+		(void) rpcrdma_deregister_external(&req->rl_segments[i],
+						   r_xprt, rep);
+		break;
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+		for (i = 0; req->rl_nchunks--;)
+			i += rpcrdma_deregister_external(&req->rl_segments[i],
+							 r_xprt, NULL);
+		break;
+	default:
+		break;
+	}
+
+	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
+			__func__, xprt, rqst, status);
+	xprt_complete_rqst(rqst->rq_task, status);
+	spin_unlock(&xprt->transport_lock);
+}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
new file mode 100644
index 0000000..dc55cc9
--- /dev/null
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -0,0 +1,800 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * transport.c
+ *
+ * This file contains the top-level implementation of an RPC RDMA
+ * transport.
+ *
+ * Naming convention: functions beginning with xprt_ are part of the
+ * transport switch. All others are RPC RDMA internal.
+ */
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/seq_file.h>
+
+#include "xprt_rdma.h"
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY	RPCDBG_TRANS
+#endif
+
+MODULE_LICENSE("Dual BSD/GPL");
+
+MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
+MODULE_AUTHOR("Network Appliance, Inc.");
+
+/*
+ * tunables
+ */
+
+static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
+static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
+static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
+static unsigned int xprt_rdma_inline_write_padding;
+#if !RPCRDMA_PERSISTENT_REGISTRATION
+static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
+#else
+static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
+#endif
+
+#ifdef RPC_DEBUG
+
+static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
+static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
+static unsigned int zero;
+static unsigned int max_padding = PAGE_SIZE;
+static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
+static unsigned int max_memreg = RPCRDMA_LAST - 1;
+
+static struct ctl_table_header *sunrpc_table_header;
+
+static ctl_table xr_tunables_table[] = {
+	{
+		.ctl_name	= CTL_SLOTTABLE_RDMA,
+		.procname	= "rdma_slot_table_entries",
+		.data		= &xprt_rdma_slot_table_entries,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_slot_table_size,
+		.extra2		= &max_slot_table_size
+	},
+	{
+		.ctl_name	= CTL_RDMA_MAXINLINEREAD,
+		.procname	= "rdma_max_inline_read",
+		.data		= &xprt_rdma_max_inline_read,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+		.strategy	= &sysctl_intvec,
+	},
+	{
+		.ctl_name	= CTL_RDMA_MAXINLINEWRITE,
+		.procname	= "rdma_max_inline_write",
+		.data		= &xprt_rdma_max_inline_write,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+		.strategy	= &sysctl_intvec,
+	},
+	{
+		.ctl_name	= CTL_RDMA_WRITEPADDING,
+		.procname	= "rdma_inline_write_padding",
+		.data		= &xprt_rdma_inline_write_padding,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &zero,
+		.extra2		= &max_padding,
+	},
+	{
+		.ctl_name	= CTL_RDMA_MEMREG,
+		.procname	= "rdma_memreg_strategy",
+		.data		= &xprt_rdma_memreg_strategy,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_memreg,
+		.extra2		= &max_memreg,
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+static ctl_table sunrpc_table[] = {
+	{
+		.ctl_name	= CTL_SUNRPC,
+		.procname	= "sunrpc",
+		.mode		= 0555,
+		.child		= xr_tunables_table
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+#endif
+
+static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */
+
+static void
+xprt_rdma_format_addresses(struct rpc_xprt *xprt)
+{
+	struct sockaddr_in *addr = (struct sockaddr_in *)
+					&rpcx_to_rdmad(xprt).addr;
+	char *buf;
+
+	buf = kzalloc(20, GFP_KERNEL);
+	if (buf)
+		snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
+	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf)
+		snprintf(buf, 8, "%u", ntohs(addr->sin_port));
+	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
+
+	xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
+
+	buf = kzalloc(48, GFP_KERNEL);
+	if (buf)
+		snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
+			NIPQUAD(addr->sin_addr.s_addr),
+			ntohs(addr->sin_port), "rdma");
+	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
+
+	buf = kzalloc(10, GFP_KERNEL);
+	if (buf)
+		snprintf(buf, 10, "%02x%02x%02x%02x",
+			NIPQUAD(addr->sin_addr.s_addr));
+	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf)
+		snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
+	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
+
+	buf = kzalloc(30, GFP_KERNEL);
+	if (buf)
+		snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
+			NIPQUAD(addr->sin_addr.s_addr),
+			ntohs(addr->sin_port) >> 8,
+			ntohs(addr->sin_port) & 0xff);
+	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
+
+	/* netid */
+	xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
+}
+
+static void
+xprt_rdma_free_addresses(struct rpc_xprt *xprt)
+{
+	kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
+	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
+	kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
+	kfree(xprt->address_strings[RPC_DISPLAY_HEX_ADDR]);
+	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
+	kfree(xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR]);
+}
+
+static void
+xprt_rdma_connect_worker(struct work_struct *work)
+{
+	struct rpcrdma_xprt *r_xprt =
+		container_of(work, struct rpcrdma_xprt, rdma_connect.work);
+	struct rpc_xprt *xprt = &r_xprt->xprt;
+	int rc = 0;
+
+	if (!xprt->shutdown) {
+		xprt_clear_connected(xprt);
+
+		dprintk("RPC:       %s: %sconnect\n", __func__,
+				r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
+		rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+		if (rc)
+			goto out;
+	}
+	goto out_clear;
+
+out:
+	xprt_wake_pending_tasks(xprt, rc);
+
+out_clear:
+	dprintk("RPC:       %s: exit\n", __func__);
+	xprt_clear_connecting(xprt);
+}
+
+/*
+ * xprt_rdma_destroy
+ *
+ * Destroy the xprt.
+ * Free all memory associated with the object, including its own.
+ * NOTE: none of the *destroy methods free memory for their top-level
+ * objects, even though they may have allocated it (they do free
+ * private memory). It's up to the caller to handle it. In this
+ * case (RDMA transport), all structure memory is inlined with the
+ * struct rpcrdma_xprt.
+ */
+static void
+xprt_rdma_destroy(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	int rc;
+
+	dprintk("RPC:       %s: called\n", __func__);
+
+	cancel_delayed_work(&r_xprt->rdma_connect);
+	flush_scheduled_work();
+
+	xprt_clear_connected(xprt);
+
+	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
+	rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
+	if (rc)
+		dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
+			__func__, rc);
+	rpcrdma_ia_close(&r_xprt->rx_ia);
+
+	xprt_rdma_free_addresses(xprt);
+
+	kfree(xprt->slot);
+	xprt->slot = NULL;
+	kfree(xprt);
+
+	dprintk("RPC:       %s: returning\n", __func__);
+
+	module_put(THIS_MODULE);
+}
+
+/**
+ * xprt_setup_rdma - Set up transport to use RDMA
+ *
+ * @args: rpc transport arguments
+ */
+static struct rpc_xprt *
+xprt_setup_rdma(struct xprt_create *args)
+{
+	struct rpcrdma_create_data_internal cdata;
+	struct rpc_xprt *xprt;
+	struct rpcrdma_xprt *new_xprt;
+	struct rpcrdma_ep *new_ep;
+	struct sockaddr_in *sin;
+	int rc;
+
+	if (args->addrlen > sizeof(xprt->addr)) {
+		dprintk("RPC:       %s: address too large\n", __func__);
+		return ERR_PTR(-EBADF);
+	}
+
+	xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
+	if (xprt == NULL) {
+		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
+			__func__);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	xprt->max_reqs = xprt_rdma_slot_table_entries;
+	xprt->slot = kcalloc(xprt->max_reqs,
+				sizeof(struct rpc_rqst), GFP_KERNEL);
+	if (xprt->slot == NULL) {
+		kfree(xprt);
+		dprintk("RPC:       %s: couldn't allocate %d slots\n",
+			__func__, xprt->max_reqs);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	/* 60 second timeout, no retries */
+	xprt_set_timeout(&xprt->timeout, 0, 60UL * HZ);
+	xprt->bind_timeout = (60U * HZ);
+	xprt->connect_timeout = (60U * HZ);
+	xprt->reestablish_timeout = (5U * HZ);
+	xprt->idle_timeout = (5U * 60 * HZ);
+
+	xprt->resvport = 0;		/* privileged port not needed */
+	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
+	xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
+	xprt->ops = &xprt_rdma_procs;
+
+	/*
+	 * Set up RDMA-specific connect data.
+	 */
+
+	/* Put server RDMA address in local cdata */
+	memcpy(&cdata.addr, args->dstaddr, args->addrlen);
+
+	/* Ensure xprt->addr holds valid server TCP (not RDMA)
+	 * address, for any side protocols which peek at it */
+	xprt->prot = IPPROTO_TCP;
+	xprt->addrlen = args->addrlen;
+	memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
+
+	sin = (struct sockaddr_in *)&cdata.addr;
+	if (ntohs(sin->sin_port) != 0)
+		xprt_set_bound(xprt);
+
+	dprintk("RPC:       %s: %u.%u.%u.%u:%u\n", __func__,
+			NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
+
+	/* Set max requests */
+	cdata.max_requests = xprt->max_reqs;
+
+	/* Set some length limits */
+	cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
+	cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
+
+	cdata.inline_wsize = xprt_rdma_max_inline_write;
+	if (cdata.inline_wsize > cdata.wsize)
+		cdata.inline_wsize = cdata.wsize;
+
+	cdata.inline_rsize = xprt_rdma_max_inline_read;
+	if (cdata.inline_rsize > cdata.rsize)
+		cdata.inline_rsize = cdata.rsize;
+
+	cdata.padding = xprt_rdma_inline_write_padding;
+
+	/*
+	 * Create new transport instance, which includes initialized
+	 *  o ia
+	 *  o endpoint
+	 *  o buffers
+	 */
+
+	new_xprt = rpcx_to_rdmax(xprt);
+
+	rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
+				xprt_rdma_memreg_strategy);
+	if (rc)
+		goto out1;
+
+	/*
+	 * initialize and create ep
+	 */
+	new_xprt->rx_data = cdata;
+	new_ep = &new_xprt->rx_ep;
+	new_ep->rep_remote_addr = cdata.addr;
+
+	rc = rpcrdma_ep_create(&new_xprt->rx_ep,
+				&new_xprt->rx_ia, &new_xprt->rx_data);
+	if (rc)
+		goto out2;
+
+	/*
+	 * Allocate pre-registered send and receive buffers for headers and
+	 * any inline data. Also specify any padding which will be provided
+	 * from a preregistered zero buffer.
+	 */
+	rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
+				&new_xprt->rx_data);
+	if (rc)
+		goto out3;
+
+	/*
+	 * Register a callback for connection events. This is necessary because
+	 * connection loss notification is async. We also catch connection loss
+	 * when reaping receives.
+	 */
+	INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
+	new_ep->rep_func = rpcrdma_conn_func;
+	new_ep->rep_xprt = xprt;
+
+	xprt_rdma_format_addresses(xprt);
+
+	if (!try_module_get(THIS_MODULE))
+		goto out4;
+
+	return xprt;
+
+out4:
+	xprt_rdma_free_addresses(xprt);
+	rc = -EINVAL;
+out3:
+	(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+out2:
+	rpcrdma_ia_close(&new_xprt->rx_ia);
+out1:
+	kfree(xprt->slot);
+	kfree(xprt);
+	return ERR_PTR(rc);
+}
+
+/*
+ * Close a connection, during shutdown or timeout/reconnect
+ */
+static void
+xprt_rdma_close(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	dprintk("RPC:       %s: closing\n", __func__);
+	xprt_disconnect(xprt);
+	(void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+}
+
+static void
+xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
+{
+	struct sockaddr_in *sap;
+
+	sap = (struct sockaddr_in *)&xprt->addr;
+	sap->sin_port = htons(port);
+	sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
+	sap->sin_port = htons(port);
+	dprintk("RPC:       %s: %u\n", __func__, port);
+}
+
+static void
+xprt_rdma_connect(struct rpc_task *task)
+{
+	struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	if (!xprt_test_and_set_connecting(xprt)) {
+		if (r_xprt->rx_ep.rep_connected != 0) {
+			/* Reconnect */
+			schedule_delayed_work(&r_xprt->rdma_connect,
+				xprt->reestablish_timeout);
+		} else {
+			schedule_delayed_work(&r_xprt->rdma_connect, 0);
+			if (!RPC_IS_ASYNC(task))
+				flush_scheduled_work();
+		}
+	}
+}
+
+static int
+xprt_rdma_reserve_xprt(struct rpc_task *task)
+{
+	struct rpc_xprt *xprt = task->tk_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
+
+	/* == RPC_CWNDSCALE @ init, but *after* setup */
+	if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
+		r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
+		dprintk("RPC:       %s: cwndscale %lu\n", __func__,
+			r_xprt->rx_buf.rb_cwndscale);
+		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
+	}
+	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
+	return xprt_reserve_xprt_cong(task);
+}
+
+/*
+ * The RDMA allocate/free functions need the task structure as a place
+ * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
+ * sequence. For this reason, the recv buffers are attached to send
+ * buffers for portions of the RPC. Note that the RPC layer allocates
+ * both send and receive buffers in the same call. We may register
+ * the receive buffer portion when using reply chunks.
+ */
+static void *
+xprt_rdma_allocate(struct rpc_task *task, size_t size)
+{
+	struct rpc_xprt *xprt = task->tk_xprt;
+	struct rpcrdma_req *req, *nreq;
+
+	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+	BUG_ON(NULL == req);
+
+	if (size > req->rl_size) {
+		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
+			"prog %d vers %d proc %d\n",
+			__func__, size, req->rl_size,
+			task->tk_client->cl_prog, task->tk_client->cl_vers,
+			task->tk_msg.rpc_proc->p_proc);
+		/*
+		 * Outgoing length shortage. Our inline write max must have
+		 * been configured to perform direct i/o.
+		 *
+		 * This is therefore a large metadata operation, and the
+		 * allocate call was made on the maximum possible message,
+		 * e.g. containing long filename(s) or symlink data. In
+		 * fact, while these metadata operations *might* carry
+		 * large outgoing payloads, they rarely *do*. However, we
+		 * have to commit to the request here, so reallocate and
+		 * register it now. The data path will never require this
+		 * reallocation.
+		 *
+		 * If the allocation or registration fails, the RPC framework
+		 * will (doggedly) retry.
+		 */
+		if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
+				RPCRDMA_BOUNCEBUFFERS) {
+			/* forced to "pure inline" */
+			dprintk("RPC:       %s: too much data (%zd) for inline "
+					"(r/w max %d/%d)\n", __func__, size,
+					rpcx_to_rdmad(xprt).inline_rsize,
+					rpcx_to_rdmad(xprt).inline_wsize);
+			size = req->rl_size;
+			rpc_exit(task, -EIO);		/* fail the operation */
+			rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+			goto out;
+		}
+		if (task->tk_flags & RPC_TASK_SWAPPER)
+			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
+		else
+			nreq = kmalloc(sizeof *req + size, GFP_NOFS);
+		if (nreq == NULL)
+			goto outfail;
+
+		if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
+				nreq->rl_base, size + sizeof(struct rpcrdma_req)
+				- offsetof(struct rpcrdma_req, rl_base),
+				&nreq->rl_handle, &nreq->rl_iov)) {
+			kfree(nreq);
+			goto outfail;
+		}
+		rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
+		nreq->rl_size = size;
+		nreq->rl_niovs = 0;
+		nreq->rl_nchunks = 0;
+		nreq->rl_buffer = (struct rpcrdma_buffer *)req;
+		nreq->rl_reply = req->rl_reply;
+		memcpy(nreq->rl_segments,
+			req->rl_segments, sizeof nreq->rl_segments);
+		/* flag the swap with an unused field */
+		nreq->rl_iov.length = 0;
+		req->rl_reply = NULL;
+		req = nreq;
+	}
+	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
+out:
+	return req->rl_xdr_buf;
+
+outfail:
+	rpcrdma_buffer_put(req);
+	rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+	return NULL;
+}
+
+/*
+ * This function returns all RDMA resources to the pool.
+ */
+static void
+xprt_rdma_free(void *buffer)
+{
+	struct rpcrdma_req *req;
+	struct rpcrdma_xprt *r_xprt;
+	struct rpcrdma_rep *rep;
+	int i;
+
+	if (buffer == NULL)
+		return;
+
+	req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
+	r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
+	rep = req->rl_reply;
+
+	dprintk("RPC:       %s: called on 0x%p%s\n",
+		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+
+	/*
+	 * Finish the deregistration. When using mw bind, this was
+	 * begun in rpcrdma_reply_handler(). In all other modes, we
+	 * do it here, in thread context. The process is considered
+	 * complete when the rr_func vector becomes NULL - this
+	 * was put in place during rpcrdma_reply_handler() - the wait
+	 * call below will not block if the dereg is "done". If
+	 * interrupted, our framework will clean up.
+	 */
+	for (i = 0; req->rl_nchunks;) {
+		--req->rl_nchunks;
+		i += rpcrdma_deregister_external(
+			&req->rl_segments[i], r_xprt, NULL);
+	}
+
+	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
+		rep->rr_func = NULL;	/* abandon the callback */
+		req->rl_reply = NULL;
+	}
+
+	if (req->rl_iov.length == 0) {	/* see allocate above */
+		struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
+		oreq->rl_reply = req->rl_reply;
+		(void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
+						   req->rl_handle,
+						   &req->rl_iov);
+		kfree(req);
+		req = oreq;
+	}
+
+	/* Put back request+reply buffers */
+	rpcrdma_buffer_put(req);
+}
+
+/*
+ * send_request invokes the meat of RPC RDMA. It must do the following:
+ *  1.  Marshal the RPC request into an RPC RDMA request, which means
+ *	putting a header in front of data, and creating IOVs for RDMA
+ *	from those in the request.
+ *  2.  In marshaling, detect opportunities for RDMA, and use them.
+ *  3.  Post a recv message to set up asynch completion, then send
+ *	the request (rpcrdma_ep_post).
+ *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
+ */
+
+static int
+xprt_rdma_send_request(struct rpc_task *task)
+{
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	struct rpc_xprt *xprt = task->tk_xprt;
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	/* marshal the send itself */
+	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
+		r_xprt->rx_stats.failed_marshal_count++;
+		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
+			__func__);
+		return -EIO;
+	}
+
+	if (req->rl_reply == NULL) 		/* e.g. reconnection */
+		rpcrdma_recv_buffer_get(req);
+
+	if (req->rl_reply) {
+		req->rl_reply->rr_func = rpcrdma_reply_handler;
+		/* this need only be done once, but... */
+		req->rl_reply->rr_xprt = xprt;
+	}
+
+	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
+		xprt_disconnect(xprt);
+		return -ENOTCONN;	/* implies disconnect */
+	}
+
+	rqst->rq_bytes_sent = 0;
+	return 0;
+}
+
+static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	long idle_time = 0;
+
+	if (xprt_connected(xprt))
+		idle_time = (long)(jiffies - xprt->last_used) / HZ;
+
+	seq_printf(seq,
+	  "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
+	  "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
+
+	   0,	/* need a local port? */
+	   xprt->stat.bind_count,
+	   xprt->stat.connect_count,
+	   xprt->stat.connect_time,
+	   idle_time,
+	   xprt->stat.sends,
+	   xprt->stat.recvs,
+	   xprt->stat.bad_xids,
+	   xprt->stat.req_u,
+	   xprt->stat.bklog_u,
+
+	   r_xprt->rx_stats.read_chunk_count,
+	   r_xprt->rx_stats.write_chunk_count,
+	   r_xprt->rx_stats.reply_chunk_count,
+	   r_xprt->rx_stats.total_rdma_request,
+	   r_xprt->rx_stats.total_rdma_reply,
+	   r_xprt->rx_stats.pullup_copy_count,
+	   r_xprt->rx_stats.fixup_copy_count,
+	   r_xprt->rx_stats.hardway_register_count,
+	   r_xprt->rx_stats.failed_marshal_count,
+	   r_xprt->rx_stats.bad_reply_count);
+}
+
+/*
+ * Plumbing for rpc transport switch and kernel module
+ */
+
+static struct rpc_xprt_ops xprt_rdma_procs = {
+	.reserve_xprt		= xprt_rdma_reserve_xprt,
+	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
+	.release_request	= xprt_release_rqst_cong,       /* ditto */
+	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
+	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
+	.set_port		= xprt_rdma_set_port,
+	.connect		= xprt_rdma_connect,
+	.buf_alloc		= xprt_rdma_allocate,
+	.buf_free		= xprt_rdma_free,
+	.send_request		= xprt_rdma_send_request,
+	.close			= xprt_rdma_close,
+	.destroy		= xprt_rdma_destroy,
+	.print_stats		= xprt_rdma_print_stats
+};
+
+static struct xprt_class xprt_rdma = {
+	.list			= LIST_HEAD_INIT(xprt_rdma.list),
+	.name			= "rdma",
+	.owner			= THIS_MODULE,
+	.ident			= XPRT_TRANSPORT_RDMA,
+	.setup			= xprt_setup_rdma,
+};
+
+static void __exit xprt_rdma_cleanup(void)
+{
+	int rc;
+
+	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
+#ifdef RPC_DEBUG
+	if (sunrpc_table_header) {
+		unregister_sysctl_table(sunrpc_table_header);
+		sunrpc_table_header = NULL;
+	}
+#endif
+	rc = xprt_unregister_transport(&xprt_rdma);
+	if (rc)
+		dprintk("RPC:       %s: xprt_unregister returned %i\n",
+			__func__, rc);
+}
+
+static int __init xprt_rdma_init(void)
+{
+	int rc;
+
+	rc = xprt_register_transport(&xprt_rdma);
+
+	if (rc)
+		return rc;
+
+	dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
+
+	dprintk(KERN_INFO "Defaults:\n");
+	dprintk(KERN_INFO "\tSlots %d\n"
+		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
+		xprt_rdma_slot_table_entries,
+		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
+	dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
+		xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
+
+#ifdef RPC_DEBUG
+	if (!sunrpc_table_header)
+		sunrpc_table_header = register_sysctl_table(sunrpc_table);
+#endif
+	return 0;
+}
+
+module_init(xprt_rdma_init);
+module_exit(xprt_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
new file mode 100644
index 0000000..9ec8ca4
--- /dev/null
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -0,0 +1,1626 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * verbs.c
+ *
+ * Encapsulates the major functions managing:
+ *  o adapters
+ *  o endpoints
+ *  o connections
+ *  o buffer memory
+ */
+
+#include <linux/pci.h>	/* for Tavor hack below */
+
+#include "xprt_rdma.h"
+
+/*
+ * Globals/Macros
+ */
+
+#ifdef RPC_DEBUG
+# define RPCDBG_FACILITY	RPCDBG_TRANS
+#endif
+
+/*
+ * internal functions
+ */
+
+/*
+ * handle replies in tasklet context, using a single, global list
+ * rdma tasklet function -- just turn around and call the func
+ * for all replies on the list
+ */
+
+static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
+static LIST_HEAD(rpcrdma_tasklets_g);
+
+static void
+rpcrdma_run_tasklet(unsigned long data)
+{
+	struct rpcrdma_rep *rep;
+	void (*func)(struct rpcrdma_rep *);
+	unsigned long flags;
+
+	data = data;
+	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+	while (!list_empty(&rpcrdma_tasklets_g)) {
+		rep = list_entry(rpcrdma_tasklets_g.next,
+				 struct rpcrdma_rep, rr_list);
+		list_del(&rep->rr_list);
+		func = rep->rr_func;
+		rep->rr_func = NULL;
+		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+
+		if (func)
+			func(rep);
+		else
+			rpcrdma_recv_buffer_put(rep);
+
+		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+	}
+	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+}
+
+static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
+
+static inline void
+rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
+	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+	tasklet_schedule(&rpcrdma_tasklet_g);
+}
+
+static void
+rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
+{
+	struct rpcrdma_ep *ep = context;
+
+	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
+		__func__, event->event, event->device->name, context);
+	if (ep->rep_connected == 1) {
+		ep->rep_connected = -EIO;
+		ep->rep_func(ep);
+		wake_up_all(&ep->rep_connect_wait);
+	}
+}
+
+static void
+rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
+{
+	struct rpcrdma_ep *ep = context;
+
+	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
+		__func__, event->event, event->device->name, context);
+	if (ep->rep_connected == 1) {
+		ep->rep_connected = -EIO;
+		ep->rep_func(ep);
+		wake_up_all(&ep->rep_connect_wait);
+	}
+}
+
+static inline
+void rpcrdma_event_process(struct ib_wc *wc)
+{
+	struct rpcrdma_rep *rep =
+			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
+
+	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
+		__func__, rep, wc->status, wc->opcode, wc->byte_len);
+
+	if (!rep) /* send or bind completion that we don't care about */
+		return;
+
+	if (IB_WC_SUCCESS != wc->status) {
+		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
+			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
+			 wc->status);
+		rep->rr_len = ~0U;
+		rpcrdma_schedule_tasklet(rep);
+		return;
+	}
+
+	switch (wc->opcode) {
+	case IB_WC_RECV:
+		rep->rr_len = wc->byte_len;
+		ib_dma_sync_single_for_cpu(
+			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
+		/* Keep (only) the most recent credits, after check validity */
+		if (rep->rr_len >= 16) {
+			struct rpcrdma_msg *p =
+					(struct rpcrdma_msg *) rep->rr_base;
+			unsigned int credits = ntohl(p->rm_credit);
+			if (credits == 0) {
+				dprintk("RPC:       %s: server"
+					" dropped credits to 0!\n", __func__);
+				/* don't deadlock */
+				credits = 1;
+			} else if (credits > rep->rr_buffer->rb_max_requests) {
+				dprintk("RPC:       %s: server"
+					" over-crediting: %d (%d)\n",
+					__func__, credits,
+					rep->rr_buffer->rb_max_requests);
+				credits = rep->rr_buffer->rb_max_requests;
+			}
+			atomic_set(&rep->rr_buffer->rb_credits, credits);
+		}
+		/* fall through */
+	case IB_WC_BIND_MW:
+		rpcrdma_schedule_tasklet(rep);
+		break;
+	default:
+		dprintk("RPC:       %s: unexpected WC event %X\n",
+			__func__, wc->opcode);
+		break;
+	}
+}
+
+static inline int
+rpcrdma_cq_poll(struct ib_cq *cq)
+{
+	struct ib_wc wc;
+	int rc;
+
+	for (;;) {
+		rc = ib_poll_cq(cq, 1, &wc);
+		if (rc < 0) {
+			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
+				__func__, rc);
+			return rc;
+		}
+		if (rc == 0)
+			break;
+
+		rpcrdma_event_process(&wc);
+	}
+
+	return 0;
+}
+
+/*
+ * rpcrdma_cq_event_upcall
+ *
+ * This upcall handles recv, send, bind and unbind events.
+ * It is reentrant but processes single events in order to maintain
+ * ordering of receives to keep server credits.
+ *
+ * It is the responsibility of the scheduled tasklet to return
+ * recv buffers to the pool. NOTE: this affects synchronization of
+ * connection shutdown. That is, the structures required for
+ * the completion of the reply handler must remain intact until
+ * all memory has been reclaimed.
+ *
+ * Note that send events are suppressed and do not result in an upcall.
+ */
+static void
+rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
+{
+	int rc;
+
+	rc = rpcrdma_cq_poll(cq);
+	if (rc)
+		return;
+
+	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+	if (rc) {
+		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
+			__func__, rc);
+		return;
+	}
+
+	rpcrdma_cq_poll(cq);
+}
+
+#ifdef RPC_DEBUG
+static const char * const conn[] = {
+	"address resolved",
+	"address error",
+	"route resolved",
+	"route error",
+	"connect request",
+	"connect response",
+	"connect error",
+	"unreachable",
+	"rejected",
+	"established",
+	"disconnected",
+	"device removal"
+};
+#endif
+
+static int
+rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
+{
+	struct rpcrdma_xprt *xprt = id->context;
+	struct rpcrdma_ia *ia = &xprt->rx_ia;
+	struct rpcrdma_ep *ep = &xprt->rx_ep;
+	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
+	struct ib_qp_attr attr;
+	struct ib_qp_init_attr iattr;
+	int connstate = 0;
+
+	switch (event->event) {
+	case RDMA_CM_EVENT_ADDR_RESOLVED:
+	case RDMA_CM_EVENT_ROUTE_RESOLVED:
+		complete(&ia->ri_done);
+		break;
+	case RDMA_CM_EVENT_ADDR_ERROR:
+		ia->ri_async_rc = -EHOSTUNREACH;
+		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
+			__func__, ep);
+		complete(&ia->ri_done);
+		break;
+	case RDMA_CM_EVENT_ROUTE_ERROR:
+		ia->ri_async_rc = -ENETUNREACH;
+		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
+			__func__, ep);
+		complete(&ia->ri_done);
+		break;
+	case RDMA_CM_EVENT_ESTABLISHED:
+		connstate = 1;
+		ib_query_qp(ia->ri_id->qp, &attr,
+			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
+			&iattr);
+		dprintk("RPC:       %s: %d responder resources"
+			" (%d initiator)\n",
+			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
+		goto connected;
+	case RDMA_CM_EVENT_CONNECT_ERROR:
+		connstate = -ENOTCONN;
+		goto connected;
+	case RDMA_CM_EVENT_UNREACHABLE:
+		connstate = -ENETDOWN;
+		goto connected;
+	case RDMA_CM_EVENT_REJECTED:
+		connstate = -ECONNREFUSED;
+		goto connected;
+	case RDMA_CM_EVENT_DISCONNECTED:
+		connstate = -ECONNABORTED;
+		goto connected;
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		connstate = -ENODEV;
+connected:
+		dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
+			" (ep 0x%p event 0x%x)\n",
+			__func__,
+			(event->event <= 11) ? conn[event->event] :
+						"unknown connection error",
+			NIPQUAD(addr->sin_addr.s_addr),
+			ntohs(addr->sin_port),
+			ep, event->event);
+		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
+		dprintk("RPC:       %s: %sconnected\n",
+					__func__, connstate > 0 ? "" : "dis");
+		ep->rep_connected = connstate;
+		ep->rep_func(ep);
+		wake_up_all(&ep->rep_connect_wait);
+		break;
+	default:
+		ia->ri_async_rc = -EINVAL;
+		dprintk("RPC:       %s: unexpected CM event %X\n",
+			__func__, event->event);
+		complete(&ia->ri_done);
+		break;
+	}
+
+	return 0;
+}
+
+static struct rdma_cm_id *
+rpcrdma_create_id(struct rpcrdma_xprt *xprt,
+			struct rpcrdma_ia *ia, struct sockaddr *addr)
+{
+	struct rdma_cm_id *id;
+	int rc;
+
+	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
+	if (IS_ERR(id)) {
+		rc = PTR_ERR(id);
+		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
+			__func__, rc);
+		return id;
+	}
+
+	ia->ri_async_rc = 0;
+	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
+	if (rc) {
+		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
+			__func__, rc);
+		goto out;
+	}
+	wait_for_completion(&ia->ri_done);
+	rc = ia->ri_async_rc;
+	if (rc)
+		goto out;
+
+	ia->ri_async_rc = 0;
+	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
+	if (rc) {
+		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
+			__func__, rc);
+		goto out;
+	}
+	wait_for_completion(&ia->ri_done);
+	rc = ia->ri_async_rc;
+	if (rc)
+		goto out;
+
+	return id;
+
+out:
+	rdma_destroy_id(id);
+	return ERR_PTR(rc);
+}
+
+/*
+ * Drain any cq, prior to teardown.
+ */
+static void
+rpcrdma_clean_cq(struct ib_cq *cq)
+{
+	struct ib_wc wc;
+	int count = 0;
+
+	while (1 == ib_poll_cq(cq, 1, &wc))
+		++count;
+
+	if (count)
+		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
+			__func__, count, wc.opcode);
+}
+
+/*
+ * Exported functions.
+ */
+
+/*
+ * Open and initialize an Interface Adapter.
+ *  o initializes fields of struct rpcrdma_ia, including
+ *    interface and provider attributes and protection zone.
+ */
+int
+rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
+{
+	int rc;
+	struct rpcrdma_ia *ia = &xprt->rx_ia;
+
+	init_completion(&ia->ri_done);
+
+	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
+	if (IS_ERR(ia->ri_id)) {
+		rc = PTR_ERR(ia->ri_id);
+		goto out1;
+	}
+
+	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+	if (IS_ERR(ia->ri_pd)) {
+		rc = PTR_ERR(ia->ri_pd);
+		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
+			__func__, rc);
+		goto out2;
+	}
+
+	/*
+	 * Optionally obtain an underlying physical identity mapping in
+	 * order to do a memory window-based bind. This base registration
+	 * is protected from remote access - that is enabled only by binding
+	 * for the specific bytes targeted during each RPC operation, and
+	 * revoked after the corresponding completion similar to a storage
+	 * adapter.
+	 */
+	if (memreg > RPCRDMA_REGISTER) {
+		int mem_priv = IB_ACCESS_LOCAL_WRITE;
+		switch (memreg) {
+#if RPCRDMA_PERSISTENT_REGISTRATION
+		case RPCRDMA_ALLPHYSICAL:
+			mem_priv |= IB_ACCESS_REMOTE_WRITE;
+			mem_priv |= IB_ACCESS_REMOTE_READ;
+			break;
+#endif
+		case RPCRDMA_MEMWINDOWS_ASYNC:
+		case RPCRDMA_MEMWINDOWS:
+			mem_priv |= IB_ACCESS_MW_BIND;
+			break;
+		default:
+			break;
+		}
+		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
+		if (IS_ERR(ia->ri_bind_mem)) {
+			printk(KERN_ALERT "%s: ib_get_dma_mr for "
+				"phys register failed with %lX\n\t"
+				"Will continue with degraded performance\n",
+				__func__, PTR_ERR(ia->ri_bind_mem));
+			memreg = RPCRDMA_REGISTER;
+			ia->ri_bind_mem = NULL;
+		}
+	}
+
+	/* Else will do memory reg/dereg for each chunk */
+	ia->ri_memreg_strategy = memreg;
+
+	return 0;
+out2:
+	rdma_destroy_id(ia->ri_id);
+out1:
+	return rc;
+}
+
+/*
+ * Clean up/close an IA.
+ *   o if event handles and PD have been initialized, free them.
+ *   o close the IA
+ */
+void
+rpcrdma_ia_close(struct rpcrdma_ia *ia)
+{
+	int rc;
+
+	dprintk("RPC:       %s: entering\n", __func__);
+	if (ia->ri_bind_mem != NULL) {
+		rc = ib_dereg_mr(ia->ri_bind_mem);
+		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
+			__func__, rc);
+	}
+	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
+		rdma_destroy_qp(ia->ri_id);
+	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
+		rc = ib_dealloc_pd(ia->ri_pd);
+		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
+			__func__, rc);
+	}
+	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
+		rdma_destroy_id(ia->ri_id);
+}
+
+/*
+ * Create unconnected endpoint.
+ */
+int
+rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
+				struct rpcrdma_create_data_internal *cdata)
+{
+	struct ib_device_attr devattr;
+	int rc;
+
+	rc = ib_query_device(ia->ri_id->device, &devattr);
+	if (rc) {
+		dprintk("RPC:       %s: ib_query_device failed %d\n",
+			__func__, rc);
+		return rc;
+	}
+
+	/* check provider's send/recv wr limits */
+	if (cdata->max_requests > devattr.max_qp_wr)
+		cdata->max_requests = devattr.max_qp_wr;
+
+	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
+	ep->rep_attr.qp_context = ep;
+	/* send_cq and recv_cq initialized below */
+	ep->rep_attr.srq = NULL;
+	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		/* Add room for mw_binds+unbinds - overkill! */
+		ep->rep_attr.cap.max_send_wr++;
+		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
+		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
+			return -EINVAL;
+		break;
+	default:
+		break;
+	}
+	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
+	ep->rep_attr.cap.max_recv_sge = 1;
+	ep->rep_attr.cap.max_inline_data = 0;
+	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+	ep->rep_attr.qp_type = IB_QPT_RC;
+	ep->rep_attr.port_num = ~0;
+
+	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
+		"iovs: send %d recv %d\n",
+		__func__,
+		ep->rep_attr.cap.max_send_wr,
+		ep->rep_attr.cap.max_recv_wr,
+		ep->rep_attr.cap.max_send_sge,
+		ep->rep_attr.cap.max_recv_sge);
+
+	/* set trigger for requesting send completion */
+	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
+	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
+		break;
+	default:
+		break;
+	}
+	if (ep->rep_cqinit <= 2)
+		ep->rep_cqinit = 0;
+	INIT_CQCOUNT(ep);
+	ep->rep_ia = ia;
+	init_waitqueue_head(&ep->rep_connect_wait);
+
+	/*
+	 * Create a single cq for receive dto and mw_bind (only ever
+	 * care about unbind, really). Send completions are suppressed.
+	 * Use single threaded tasklet upcalls to maintain ordering.
+	 */
+	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
+				  rpcrdma_cq_async_error_upcall, NULL,
+				  ep->rep_attr.cap.max_recv_wr +
+				  ep->rep_attr.cap.max_send_wr + 1, 0);
+	if (IS_ERR(ep->rep_cq)) {
+		rc = PTR_ERR(ep->rep_cq);
+		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
+			__func__, rc);
+		goto out1;
+	}
+
+	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
+	if (rc) {
+		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
+			__func__, rc);
+		goto out2;
+	}
+
+	ep->rep_attr.send_cq = ep->rep_cq;
+	ep->rep_attr.recv_cq = ep->rep_cq;
+
+	/* Initialize cma parameters */
+
+	/* RPC/RDMA does not use private data */
+	ep->rep_remote_cma.private_data = NULL;
+	ep->rep_remote_cma.private_data_len = 0;
+
+	/* Client offers RDMA Read but does not initiate */
+	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_BOUNCEBUFFERS:
+		ep->rep_remote_cma.responder_resources = 0;
+		break;
+	case RPCRDMA_MTHCAFMR:
+	case RPCRDMA_REGISTER:
+		ep->rep_remote_cma.responder_resources = cdata->max_requests *
+				(RPCRDMA_MAX_DATA_SEGS / 8);
+		break;
+	case RPCRDMA_MEMWINDOWS:
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+#if RPCRDMA_PERSISTENT_REGISTRATION
+	case RPCRDMA_ALLPHYSICAL:
+#endif
+		ep->rep_remote_cma.responder_resources = cdata->max_requests *
+				(RPCRDMA_MAX_DATA_SEGS / 2);
+		break;
+	default:
+		break;
+	}
+	if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
+		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
+	ep->rep_remote_cma.initiator_depth = 0;
+
+	ep->rep_remote_cma.retry_count = 7;
+	ep->rep_remote_cma.flow_control = 0;
+	ep->rep_remote_cma.rnr_retry_count = 0;
+
+	return 0;
+
+out2:
+	if (ib_destroy_cq(ep->rep_cq))
+		;
+out1:
+	return rc;
+}
+
+/*
+ * rpcrdma_ep_destroy
+ *
+ * Disconnect and destroy endpoint. After this, the only
+ * valid operations on the ep are to free it (if dynamically
+ * allocated) or re-create it.
+ *
+ * The caller's error handling must be sure to not leak the endpoint
+ * if this function fails.
+ */
+int
+rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+	int rc;
+
+	dprintk("RPC:       %s: entering, connected is %d\n",
+		__func__, ep->rep_connected);
+
+	if (ia->ri_id->qp) {
+		rc = rpcrdma_ep_disconnect(ep, ia);
+		if (rc)
+			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
+				" returned %i\n", __func__, rc);
+	}
+
+	ep->rep_func = NULL;
+
+	/* padding - could be done in rpcrdma_buffer_destroy... */
+	if (ep->rep_pad_mr) {
+		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
+		ep->rep_pad_mr = NULL;
+	}
+
+	if (ia->ri_id->qp) {
+		rdma_destroy_qp(ia->ri_id);
+		ia->ri_id->qp = NULL;
+	}
+
+	rpcrdma_clean_cq(ep->rep_cq);
+	rc = ib_destroy_cq(ep->rep_cq);
+	if (rc)
+		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
+			__func__, rc);
+
+	return rc;
+}
+
+/*
+ * Connect unconnected endpoint.
+ */
+int
+rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+	struct rdma_cm_id *id;
+	int rc = 0;
+	int retry_count = 0;
+	int reconnect = (ep->rep_connected != 0);
+
+	if (reconnect) {
+		struct rpcrdma_xprt *xprt;
+retry:
+		rc = rpcrdma_ep_disconnect(ep, ia);
+		if (rc && rc != -ENOTCONN)
+			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
+				" status %i\n", __func__, rc);
+		rpcrdma_clean_cq(ep->rep_cq);
+
+		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+		id = rpcrdma_create_id(xprt, ia,
+				(struct sockaddr *)&xprt->rx_data.addr);
+		if (IS_ERR(id)) {
+			rc = PTR_ERR(id);
+			goto out;
+		}
+		/* TEMP TEMP TEMP - fail if new device:
+		 * Deregister/remarshal *all* requests!
+		 * Close and recreate adapter, pd, etc!
+		 * Re-determine all attributes still sane!
+		 * More stuff I haven't thought of!
+		 * Rrrgh!
+		 */
+		if (ia->ri_id->device != id->device) {
+			printk("RPC:       %s: can't reconnect on "
+				"different device!\n", __func__);
+			rdma_destroy_id(id);
+			rc = -ENETDOWN;
+			goto out;
+		}
+		/* END TEMP */
+		rdma_destroy_id(ia->ri_id);
+		ia->ri_id = id;
+	}
+
+	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+	if (rc) {
+		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
+			__func__, rc);
+		goto out;
+	}
+
+/* XXX Tavor device performs badly with 2K MTU! */
+if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
+	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
+	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
+	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
+	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
+		struct ib_qp_attr attr = {
+			.path_mtu = IB_MTU_1024
+		};
+		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
+	}
+}
+
+	/* Theoretically a client initiator_depth > 0 is not needed,
+	 * but many peers fail to complete the connection unless they
+	 * == responder_resources! */
+	if (ep->rep_remote_cma.initiator_depth !=
+				ep->rep_remote_cma.responder_resources)
+		ep->rep_remote_cma.initiator_depth =
+			ep->rep_remote_cma.responder_resources;
+
+	ep->rep_connected = 0;
+
+	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
+	if (rc) {
+		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
+				__func__, rc);
+		goto out;
+	}
+
+	if (reconnect)
+		return 0;
+
+	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
+
+	/*
+	 * Check state. A non-peer reject indicates no listener
+	 * (ECONNREFUSED), which may be a transient state. All
+	 * others indicate a transport condition which has already
+	 * undergone a best-effort.
+	 */
+	if (ep->rep_connected == -ECONNREFUSED
+	    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
+		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
+		goto retry;
+	}
+	if (ep->rep_connected <= 0) {
+		/* Sometimes, the only way to reliably connect to remote
+		 * CMs is to use same nonzero values for ORD and IRD. */
+		ep->rep_remote_cma.initiator_depth =
+					ep->rep_remote_cma.responder_resources;
+		if (ep->rep_remote_cma.initiator_depth == 0)
+			++ep->rep_remote_cma.initiator_depth;
+		if (ep->rep_remote_cma.responder_resources == 0)
+			++ep->rep_remote_cma.responder_resources;
+		if (retry_count++ == 0)
+			goto retry;
+		rc = ep->rep_connected;
+	} else {
+		dprintk("RPC:       %s: connected\n", __func__);
+	}
+
+out:
+	if (rc)
+		ep->rep_connected = rc;
+	return rc;
+}
+
+/*
+ * rpcrdma_ep_disconnect
+ *
+ * This is separate from destroy to facilitate the ability
+ * to reconnect without recreating the endpoint.
+ *
+ * This call is not reentrant, and must not be made in parallel
+ * on the same endpoint.
+ */
+int
+rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+	int rc;
+
+	rpcrdma_clean_cq(ep->rep_cq);
+	rc = rdma_disconnect(ia->ri_id);
+	if (!rc) {
+		/* returns without wait if not connected */
+		wait_event_interruptible(ep->rep_connect_wait,
+							ep->rep_connected != 1);
+		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
+			(ep->rep_connected == 1) ? "still " : "dis");
+	} else {
+		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
+		ep->rep_connected = rc;
+	}
+	return rc;
+}
+
+/*
+ * Initialize buffer memory
+ */
+int
+rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
+	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
+{
+	char *p;
+	size_t len;
+	int i, rc;
+
+	buf->rb_max_requests = cdata->max_requests;
+	spin_lock_init(&buf->rb_lock);
+	atomic_set(&buf->rb_credits, 1);
+
+	/* Need to allocate:
+	 *   1.  arrays for send and recv pointers
+	 *   2.  arrays of struct rpcrdma_req to fill in pointers
+	 *   3.  array of struct rpcrdma_rep for replies
+	 *   4.  padding, if any
+	 *   5.  mw's, if any
+	 * Send/recv buffers in req/rep need to be registered
+	 */
+
+	len = buf->rb_max_requests *
+		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
+	len += cdata->padding;
+	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_MTHCAFMR:
+		/* TBD we are perhaps overallocating here */
+		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
+				sizeof(struct rpcrdma_mw);
+		break;
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
+				sizeof(struct rpcrdma_mw);
+		break;
+	default:
+		break;
+	}
+
+	/* allocate 1, 4 and 5 in one shot */
+	p = kzalloc(len, GFP_KERNEL);
+	if (p == NULL) {
+		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
+			__func__, len);
+		rc = -ENOMEM;
+		goto out;
+	}
+	buf->rb_pool = p;	/* for freeing it later */
+
+	buf->rb_send_bufs = (struct rpcrdma_req **) p;
+	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
+	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
+	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
+
+	/*
+	 * Register the zeroed pad buffer, if any.
+	 */
+	if (cdata->padding) {
+		rc = rpcrdma_register_internal(ia, p, cdata->padding,
+					    &ep->rep_pad_mr, &ep->rep_pad);
+		if (rc)
+			goto out;
+	}
+	p += cdata->padding;
+
+	/*
+	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
+	 * We "cycle" the mw's in order to minimize rkey reuse,
+	 * and also reduce unbind-to-bind collision.
+	 */
+	INIT_LIST_HEAD(&buf->rb_mws);
+	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_MTHCAFMR:
+		{
+		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
+		struct ib_fmr_attr fa = {
+			RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
+		};
+		/* TBD we are perhaps overallocating here */
+		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
+			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
+				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
+				&fa);
+			if (IS_ERR(r->r.fmr)) {
+				rc = PTR_ERR(r->r.fmr);
+				dprintk("RPC:       %s: ib_alloc_fmr"
+					" failed %i\n", __func__, rc);
+				goto out;
+			}
+			list_add(&r->mw_list, &buf->rb_mws);
+			++r;
+		}
+		}
+		break;
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		{
+		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
+		/* Allocate one extra request's worth, for full cycling */
+		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
+			r->r.mw = ib_alloc_mw(ia->ri_pd);
+			if (IS_ERR(r->r.mw)) {
+				rc = PTR_ERR(r->r.mw);
+				dprintk("RPC:       %s: ib_alloc_mw"
+					" failed %i\n", __func__, rc);
+				goto out;
+			}
+			list_add(&r->mw_list, &buf->rb_mws);
+			++r;
+		}
+		}
+		break;
+	default:
+		break;
+	}
+
+	/*
+	 * Allocate/init the request/reply buffers. Doing this
+	 * using kmalloc for now -- one for each buf.
+	 */
+	for (i = 0; i < buf->rb_max_requests; i++) {
+		struct rpcrdma_req *req;
+		struct rpcrdma_rep *rep;
+
+		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
+		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
+		/* Typical ~2400b, so rounding up saves work later */
+		if (len < 4096)
+			len = 4096;
+		req = kmalloc(len, GFP_KERNEL);
+		if (req == NULL) {
+			dprintk("RPC:       %s: request buffer %d alloc"
+				" failed\n", __func__, i);
+			rc = -ENOMEM;
+			goto out;
+		}
+		memset(req, 0, sizeof(struct rpcrdma_req));
+		buf->rb_send_bufs[i] = req;
+		buf->rb_send_bufs[i]->rl_buffer = buf;
+
+		rc = rpcrdma_register_internal(ia, req->rl_base,
+				len - offsetof(struct rpcrdma_req, rl_base),
+				&buf->rb_send_bufs[i]->rl_handle,
+				&buf->rb_send_bufs[i]->rl_iov);
+		if (rc)
+			goto out;
+
+		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
+
+		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
+		rep = kmalloc(len, GFP_KERNEL);
+		if (rep == NULL) {
+			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
+				__func__, i);
+			rc = -ENOMEM;
+			goto out;
+		}
+		memset(rep, 0, sizeof(struct rpcrdma_rep));
+		buf->rb_recv_bufs[i] = rep;
+		buf->rb_recv_bufs[i]->rr_buffer = buf;
+		init_waitqueue_head(&rep->rr_unbind);
+
+		rc = rpcrdma_register_internal(ia, rep->rr_base,
+				len - offsetof(struct rpcrdma_rep, rr_base),
+				&buf->rb_recv_bufs[i]->rr_handle,
+				&buf->rb_recv_bufs[i]->rr_iov);
+		if (rc)
+			goto out;
+
+	}
+	dprintk("RPC:       %s: max_requests %d\n",
+		__func__, buf->rb_max_requests);
+	/* done */
+	return 0;
+out:
+	rpcrdma_buffer_destroy(buf);
+	return rc;
+}
+
+/*
+ * Unregister and destroy buffer memory. Need to deal with
+ * partial initialization, so it's callable from failed create.
+ * Must be called before destroying endpoint, as registrations
+ * reference it.
+ */
+void
+rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
+{
+	int rc, i;
+	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+
+	/* clean up in reverse order from create
+	 *   1.  recv mr memory (mr free, then kfree)
+	 *   1a. bind mw memory
+	 *   2.  send mr memory (mr free, then kfree)
+	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
+	 *   4.  arrays
+	 */
+	dprintk("RPC:       %s: entering\n", __func__);
+
+	for (i = 0; i < buf->rb_max_requests; i++) {
+		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
+			rpcrdma_deregister_internal(ia,
+					buf->rb_recv_bufs[i]->rr_handle,
+					&buf->rb_recv_bufs[i]->rr_iov);
+			kfree(buf->rb_recv_bufs[i]);
+		}
+		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
+			while (!list_empty(&buf->rb_mws)) {
+				struct rpcrdma_mw *r;
+				r = list_entry(buf->rb_mws.next,
+					struct rpcrdma_mw, mw_list);
+				list_del(&r->mw_list);
+				switch (ia->ri_memreg_strategy) {
+				case RPCRDMA_MTHCAFMR:
+					rc = ib_dealloc_fmr(r->r.fmr);
+					if (rc)
+						dprintk("RPC:       %s:"
+							" ib_dealloc_fmr"
+							" failed %i\n",
+							__func__, rc);
+					break;
+				case RPCRDMA_MEMWINDOWS_ASYNC:
+				case RPCRDMA_MEMWINDOWS:
+					rc = ib_dealloc_mw(r->r.mw);
+					if (rc)
+						dprintk("RPC:       %s:"
+							" ib_dealloc_mw"
+							" failed %i\n",
+							__func__, rc);
+					break;
+				default:
+					break;
+				}
+			}
+			rpcrdma_deregister_internal(ia,
+					buf->rb_send_bufs[i]->rl_handle,
+					&buf->rb_send_bufs[i]->rl_iov);
+			kfree(buf->rb_send_bufs[i]);
+		}
+	}
+
+	kfree(buf->rb_pool);
+}
+
+/*
+ * Get a set of request/reply buffers.
+ *
+ * Reply buffer (if needed) is attached to send buffer upon return.
+ * Rule:
+ *    rb_send_index and rb_recv_index MUST always be pointing to the
+ *    *next* available buffer (non-NULL). They are incremented after
+ *    removing buffers, and decremented *before* returning them.
+ */
+struct rpcrdma_req *
+rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
+{
+	struct rpcrdma_req *req;
+	unsigned long flags;
+
+	spin_lock_irqsave(&buffers->rb_lock, flags);
+	if (buffers->rb_send_index == buffers->rb_max_requests) {
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+		dprintk("RPC:       %s: out of request buffers\n", __func__);
+		return ((struct rpcrdma_req *)NULL);
+	}
+
+	req = buffers->rb_send_bufs[buffers->rb_send_index];
+	if (buffers->rb_send_index < buffers->rb_recv_index) {
+		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
+			__func__,
+			buffers->rb_recv_index - buffers->rb_send_index);
+		req->rl_reply = NULL;
+	} else {
+		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+	}
+	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+	if (!list_empty(&buffers->rb_mws)) {
+		int i = RPCRDMA_MAX_SEGS - 1;
+		do {
+			struct rpcrdma_mw *r;
+			r = list_entry(buffers->rb_mws.next,
+					struct rpcrdma_mw, mw_list);
+			list_del(&r->mw_list);
+			req->rl_segments[i].mr_chunk.rl_mw = r;
+		} while (--i >= 0);
+	}
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	return req;
+}
+
+/*
+ * Put request/reply buffers back into pool.
+ * Pre-decrement counter/array index.
+ */
+void
+rpcrdma_buffer_put(struct rpcrdma_req *req)
+{
+	struct rpcrdma_buffer *buffers = req->rl_buffer;
+	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
+	int i;
+	unsigned long flags;
+
+	BUG_ON(req->rl_nchunks != 0);
+	spin_lock_irqsave(&buffers->rb_lock, flags);
+	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
+	req->rl_niovs = 0;
+	if (req->rl_reply) {
+		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
+		init_waitqueue_head(&req->rl_reply->rr_unbind);
+		req->rl_reply->rr_func = NULL;
+		req->rl_reply = NULL;
+	}
+	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_MTHCAFMR:
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		/*
+		 * Cycle mw's back in reverse order, and "spin" them.
+		 * This delays and scrambles reuse as much as possible.
+		 */
+		i = 1;
+		do {
+			struct rpcrdma_mw **mw;
+			mw = &req->rl_segments[i].mr_chunk.rl_mw;
+			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
+			*mw = NULL;
+		} while (++i < RPCRDMA_MAX_SEGS);
+		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
+					&buffers->rb_mws);
+		req->rl_segments[0].mr_chunk.rl_mw = NULL;
+		break;
+	default:
+		break;
+	}
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Recover reply buffers from pool.
+ * This happens when recovering from error conditions.
+ * Post-increment counter/array index.
+ */
+void
+rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
+{
+	struct rpcrdma_buffer *buffers = req->rl_buffer;
+	unsigned long flags;
+
+	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
+		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
+	spin_lock_irqsave(&buffers->rb_lock, flags);
+	if (buffers->rb_recv_index < buffers->rb_max_requests) {
+		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+	}
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Put reply buffers back into pool when not attached to
+ * request. This happens in error conditions, and when
+ * aborting unbinds. Pre-decrement counter/array index.
+ */
+void
+rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+{
+	struct rpcrdma_buffer *buffers = rep->rr_buffer;
+	unsigned long flags;
+
+	rep->rr_func = NULL;
+	spin_lock_irqsave(&buffers->rb_lock, flags);
+	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Wrappers for internal-use kmalloc memory registration, used by buffer code.
+ */
+
+int
+rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
+				struct ib_mr **mrp, struct ib_sge *iov)
+{
+	struct ib_phys_buf ipb;
+	struct ib_mr *mr;
+	int rc;
+
+	/*
+	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
+	 */
+	iov->addr = ib_dma_map_single(ia->ri_id->device,
+			va, len, DMA_BIDIRECTIONAL);
+	iov->length = len;
+
+	if (ia->ri_bind_mem != NULL) {
+		*mrp = NULL;
+		iov->lkey = ia->ri_bind_mem->lkey;
+		return 0;
+	}
+
+	ipb.addr = iov->addr;
+	ipb.size = iov->length;
+	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
+			IB_ACCESS_LOCAL_WRITE, &iov->addr);
+
+	dprintk("RPC:       %s: phys convert: 0x%llx "
+			"registered 0x%llx length %d\n",
+			__func__, ipb.addr, iov->addr, len);
+
+	if (IS_ERR(mr)) {
+		*mrp = NULL;
+		rc = PTR_ERR(mr);
+		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
+	} else {
+		*mrp = mr;
+		iov->lkey = mr->lkey;
+		rc = 0;
+	}
+
+	return rc;
+}
+
+int
+rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
+				struct ib_mr *mr, struct ib_sge *iov)
+{
+	int rc;
+
+	ib_dma_unmap_single(ia->ri_id->device,
+			iov->addr, iov->length, DMA_BIDIRECTIONAL);
+
+	if (NULL == mr)
+		return 0;
+
+	rc = ib_dereg_mr(mr);
+	if (rc)
+		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
+	return rc;
+}
+
+/*
+ * Wrappers for chunk registration, shared by read/write chunk code.
+ */
+
+static void
+rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
+{
+	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
+	seg->mr_dmalen = seg->mr_len;
+	if (seg->mr_page)
+		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
+				seg->mr_page, offset_in_page(seg->mr_offset),
+				seg->mr_dmalen, seg->mr_dir);
+	else
+		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
+				seg->mr_offset,
+				seg->mr_dmalen, seg->mr_dir);
+}
+
+static void
+rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
+{
+	if (seg->mr_page)
+		ib_dma_unmap_page(ia->ri_id->device,
+				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
+	else
+		ib_dma_unmap_single(ia->ri_id->device,
+				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
+}
+
+int
+rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
+			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
+				  IB_ACCESS_REMOTE_READ);
+	struct rpcrdma_mr_seg *seg1 = seg;
+	int i;
+	int rc = 0;
+
+	switch (ia->ri_memreg_strategy) {
+
+#if RPCRDMA_PERSISTENT_REGISTRATION
+	case RPCRDMA_ALLPHYSICAL:
+		rpcrdma_map_one(ia, seg, writing);
+		seg->mr_rkey = ia->ri_bind_mem->rkey;
+		seg->mr_base = seg->mr_dma;
+		seg->mr_nsegs = 1;
+		nsegs = 1;
+		break;
+#endif
+
+	/* Registration using fast memory registration */
+	case RPCRDMA_MTHCAFMR:
+		{
+		u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
+		int len, pageoff = offset_in_page(seg->mr_offset);
+		seg1->mr_offset -= pageoff;	/* start of page */
+		seg1->mr_len += pageoff;
+		len = -pageoff;
+		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
+			nsegs = RPCRDMA_MAX_DATA_SEGS;
+		for (i = 0; i < nsegs;) {
+			rpcrdma_map_one(ia, seg, writing);
+			physaddrs[i] = seg->mr_dma;
+			len += seg->mr_len;
+			++seg;
+			++i;
+			/* Check for holes */
+			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
+			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
+				break;
+		}
+		nsegs = i;
+		rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
+					physaddrs, nsegs, seg1->mr_dma);
+		if (rc) {
+			dprintk("RPC:       %s: failed ib_map_phys_fmr "
+				"%u@0x%llx+%i (%d)... status %i\n", __func__,
+				len, (unsigned long long)seg1->mr_dma,
+				pageoff, nsegs, rc);
+			while (nsegs--)
+				rpcrdma_unmap_one(ia, --seg);
+		} else {
+			seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
+			seg1->mr_base = seg1->mr_dma + pageoff;
+			seg1->mr_nsegs = nsegs;
+			seg1->mr_len = len;
+		}
+		}
+		break;
+
+	/* Registration using memory windows */
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		{
+		struct ib_mw_bind param;
+		rpcrdma_map_one(ia, seg, writing);
+		param.mr = ia->ri_bind_mem;
+		param.wr_id = 0ULL;	/* no send cookie */
+		param.addr = seg->mr_dma;
+		param.length = seg->mr_len;
+		param.send_flags = 0;
+		param.mw_access_flags = mem_priv;
+
+		DECR_CQCOUNT(&r_xprt->rx_ep);
+		rc = ib_bind_mw(ia->ri_id->qp,
+					seg->mr_chunk.rl_mw->r.mw, &param);
+		if (rc) {
+			dprintk("RPC:       %s: failed ib_bind_mw "
+				"%u@0x%llx status %i\n",
+				__func__, seg->mr_len,
+				(unsigned long long)seg->mr_dma, rc);
+			rpcrdma_unmap_one(ia, seg);
+		} else {
+			seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
+			seg->mr_base = param.addr;
+			seg->mr_nsegs = 1;
+			nsegs = 1;
+		}
+		}
+		break;
+
+	/* Default registration each time */
+	default:
+		{
+		struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
+		int len = 0;
+		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
+			nsegs = RPCRDMA_MAX_DATA_SEGS;
+		for (i = 0; i < nsegs;) {
+			rpcrdma_map_one(ia, seg, writing);
+			ipb[i].addr = seg->mr_dma;
+			ipb[i].size = seg->mr_len;
+			len += seg->mr_len;
+			++seg;
+			++i;
+			/* Check for holes */
+			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
+			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
+				break;
+		}
+		nsegs = i;
+		seg1->mr_base = seg1->mr_dma;
+		seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
+					ipb, nsegs, mem_priv, &seg1->mr_base);
+		if (IS_ERR(seg1->mr_chunk.rl_mr)) {
+			rc = PTR_ERR(seg1->mr_chunk.rl_mr);
+			dprintk("RPC:       %s: failed ib_reg_phys_mr "
+				"%u@0x%llx (%d)... status %i\n",
+				__func__, len,
+				(unsigned long long)seg1->mr_dma, nsegs, rc);
+			while (nsegs--)
+				rpcrdma_unmap_one(ia, --seg);
+		} else {
+			seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
+			seg1->mr_nsegs = nsegs;
+			seg1->mr_len = len;
+		}
+		}
+		break;
+	}
+	if (rc)
+		return -1;
+
+	return nsegs;
+}
+
+int
+rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
+		struct rpcrdma_xprt *r_xprt, void *r)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_mr_seg *seg1 = seg;
+	int nsegs = seg->mr_nsegs, rc;
+
+	switch (ia->ri_memreg_strategy) {
+
+#if RPCRDMA_PERSISTENT_REGISTRATION
+	case RPCRDMA_ALLPHYSICAL:
+		BUG_ON(nsegs != 1);
+		rpcrdma_unmap_one(ia, seg);
+		rc = 0;
+		break;
+#endif
+
+	case RPCRDMA_MTHCAFMR:
+		{
+		LIST_HEAD(l);
+		list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
+		rc = ib_unmap_fmr(&l);
+		while (seg1->mr_nsegs--)
+			rpcrdma_unmap_one(ia, seg++);
+		}
+		if (rc)
+			dprintk("RPC:       %s: failed ib_unmap_fmr,"
+				" status %i\n", __func__, rc);
+		break;
+
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		{
+		struct ib_mw_bind param;
+		BUG_ON(nsegs != 1);
+		param.mr = ia->ri_bind_mem;
+		param.addr = 0ULL;	/* unbind */
+		param.length = 0;
+		param.mw_access_flags = 0;
+		if (r) {
+			param.wr_id = (u64) (unsigned long) r;
+			param.send_flags = IB_SEND_SIGNALED;
+			INIT_CQCOUNT(&r_xprt->rx_ep);
+		} else {
+			param.wr_id = 0ULL;
+			param.send_flags = 0;
+			DECR_CQCOUNT(&r_xprt->rx_ep);
+		}
+		rc = ib_bind_mw(ia->ri_id->qp,
+				seg->mr_chunk.rl_mw->r.mw, &param);
+		rpcrdma_unmap_one(ia, seg);
+		}
+		if (rc)
+			dprintk("RPC:       %s: failed ib_(un)bind_mw,"
+				" status %i\n", __func__, rc);
+		else
+			r = NULL;	/* will upcall on completion */
+		break;
+
+	default:
+		rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
+		seg1->mr_chunk.rl_mr = NULL;
+		while (seg1->mr_nsegs--)
+			rpcrdma_unmap_one(ia, seg++);
+		if (rc)
+			dprintk("RPC:       %s: failed ib_dereg_mr,"
+				" status %i\n", __func__, rc);
+		break;
+	}
+	if (r) {
+		struct rpcrdma_rep *rep = r;
+		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
+		rep->rr_func = NULL;
+		func(rep);	/* dereg done, callback now */
+	}
+	return nsegs;
+}
+
+/*
+ * Prepost any receive buffer, then post send.
+ *
+ * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ */
+int
+rpcrdma_ep_post(struct rpcrdma_ia *ia,
+		struct rpcrdma_ep *ep,
+		struct rpcrdma_req *req)
+{
+	struct ib_send_wr send_wr, *send_wr_fail;
+	struct rpcrdma_rep *rep = req->rl_reply;
+	int rc;
+
+	if (rep) {
+		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+		if (rc)
+			goto out;
+		req->rl_reply = NULL;
+	}
+
+	send_wr.next = NULL;
+	send_wr.wr_id = 0ULL;	/* no send cookie */
+	send_wr.sg_list = req->rl_send_iov;
+	send_wr.num_sge = req->rl_niovs;
+	send_wr.opcode = IB_WR_SEND;
+	send_wr.imm_data = 0;
+	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
+		ib_dma_sync_single_for_device(ia->ri_id->device,
+			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
+			DMA_TO_DEVICE);
+	ib_dma_sync_single_for_device(ia->ri_id->device,
+		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
+		DMA_TO_DEVICE);
+	ib_dma_sync_single_for_device(ia->ri_id->device,
+		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
+		DMA_TO_DEVICE);
+
+	if (DECR_CQCOUNT(ep) > 0)
+		send_wr.send_flags = 0;
+	else { /* Provider must take a send completion every now and then */
+		INIT_CQCOUNT(ep);
+		send_wr.send_flags = IB_SEND_SIGNALED;
+	}
+
+	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
+	if (rc)
+		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
+			rc);
+out:
+	return rc;
+}
+
+/*
+ * (Re)post a receive buffer.
+ */
+int
+rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
+		     struct rpcrdma_ep *ep,
+		     struct rpcrdma_rep *rep)
+{
+	struct ib_recv_wr recv_wr, *recv_wr_fail;
+	int rc;
+
+	recv_wr.next = NULL;
+	recv_wr.wr_id = (u64) (unsigned long) rep;
+	recv_wr.sg_list = &rep->rr_iov;
+	recv_wr.num_sge = 1;
+
+	ib_dma_sync_single_for_cpu(ia->ri_id->device,
+		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
+
+	DECR_CQCOUNT(ep);
+	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
+
+	if (rc)
+		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
+			rc);
+	return rc;
+}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
new file mode 100644
index 0000000..2427822
--- /dev/null
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -0,0 +1,330 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _LINUX_SUNRPC_XPRT_RDMA_H
+#define _LINUX_SUNRPC_XPRT_RDMA_H
+
+#include <linux/wait.h> 		/* wait_queue_head_t, etc */
+#include <linux/spinlock.h> 		/* spinlock_t, etc */
+#include <asm/atomic.h>			/* atomic_t, etc */
+
+#include <rdma/rdma_cm.h>		/* RDMA connection api */
+#include <rdma/ib_verbs.h>		/* RDMA verbs api */
+
+#include <linux/sunrpc/clnt.h> 		/* rpc_xprt */
+#include <linux/sunrpc/rpc_rdma.h> 	/* RPC/RDMA protocol */
+#include <linux/sunrpc/xprtrdma.h> 	/* xprt parameters */
+
+/*
+ * Interface Adapter -- one per transport instance
+ */
+struct rpcrdma_ia {
+	struct rdma_cm_id 	*ri_id;
+	struct ib_pd		*ri_pd;
+	struct ib_mr		*ri_bind_mem;
+	struct completion	ri_done;
+	int			ri_async_rc;
+	enum rpcrdma_memreg	ri_memreg_strategy;
+};
+
+/*
+ * RDMA Endpoint -- one per transport instance
+ */
+
+struct rpcrdma_ep {
+	atomic_t		rep_cqcount;
+	int			rep_cqinit;
+	int			rep_connected;
+	struct rpcrdma_ia	*rep_ia;
+	struct ib_cq		*rep_cq;
+	struct ib_qp_init_attr	rep_attr;
+	wait_queue_head_t 	rep_connect_wait;
+	struct ib_sge		rep_pad;	/* holds zeroed pad */
+	struct ib_mr		*rep_pad_mr;	/* holds zeroed pad */
+	void			(*rep_func)(struct rpcrdma_ep *);
+	struct rpc_xprt		*rep_xprt;	/* for rep_func */
+	struct rdma_conn_param	rep_remote_cma;
+	struct sockaddr_storage	rep_remote_addr;
+};
+
+#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
+#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+
+/*
+ * struct rpcrdma_rep -- this structure encapsulates state required to recv
+ * and complete a reply, asychronously. It needs several pieces of
+ * state:
+ *   o recv buffer (posted to provider)
+ *   o ib_sge (also donated to provider)
+ *   o status of reply (length, success or not)
+ *   o bookkeeping state to get run by tasklet (list, etc)
+ *
+ * These are allocated during initialization, per-transport instance;
+ * however, the tasklet execution list itself is global, as it should
+ * always be pretty short.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ */
+
+/* temporary static scatter/gather max */
+#define RPCRDMA_MAX_DATA_SEGS	(8)	/* max scatter/gather */
+#define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
+#define MAX_RPCRDMAHDR	(\
+	/* max supported RPC/RDMA header */ \
+	sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \
+	(sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32))
+
+struct rpcrdma_buffer;
+
+struct rpcrdma_rep {
+	unsigned int	rr_len;		/* actual received reply length */
+	struct rpcrdma_buffer *rr_buffer; /* home base for this structure */
+	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */
+	void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
+	struct list_head rr_list;	/* tasklet list */
+	wait_queue_head_t rr_unbind;	/* optional unbind wait */
+	struct ib_sge	rr_iov;		/* for posting */
+	struct ib_mr	*rr_handle;	/* handle for mem in rr_iov */
+	char	rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
+};
+
+/*
+ * struct rpcrdma_req -- structure central to the request/reply sequence.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ *
+ * It includes pre-registered buffer memory for send AND recv.
+ * The recv buffer, however, is not owned by this structure, and
+ * is "donated" to the hardware when a recv is posted. When a
+ * reply is handled, the recv buffer used is given back to the
+ * struct rpcrdma_req associated with the request.
+ *
+ * In addition to the basic memory, this structure includes an array
+ * of iovs for send operations. The reason is that the iovs passed to
+ * ib_post_{send,recv} must not be modified until the work request
+ * completes.
+ *
+ * NOTES:
+ *   o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
+ *     marshal. The number needed varies depending on the iov lists that
+ *     are passed to us, the memory registration mode we are in, and if
+ *     physical addressing is used, the layout.
+ */
+
+struct rpcrdma_mr_seg {		/* chunk descriptors */
+	union {				/* chunk memory handles */
+		struct ib_mr	*rl_mr;		/* if registered directly */
+		struct rpcrdma_mw {		/* if registered from region */
+			union {
+				struct ib_mw	*mw;
+				struct ib_fmr	*fmr;
+			} r;
+			struct list_head mw_list;
+		} *rl_mw;
+	} mr_chunk;
+	u64		mr_base;	/* registration result */
+	u32		mr_rkey;	/* registration result */
+	u32		mr_len;		/* length of chunk or segment */
+	int		mr_nsegs;	/* number of segments in chunk or 0 */
+	enum dma_data_direction	mr_dir;	/* segment mapping direction */
+	dma_addr_t	mr_dma;		/* segment mapping address */
+	size_t		mr_dmalen;	/* segment mapping length */
+	struct page	*mr_page;	/* owning page, if any */
+	char		*mr_offset;	/* kva if no page, else offset */
+};
+
+struct rpcrdma_req {
+	size_t 		rl_size;	/* actual length of buffer */
+	unsigned int	rl_niovs;	/* 0, 2 or 4 */
+	unsigned int	rl_nchunks;	/* non-zero if chunks */
+	struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
+	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
+	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
+	struct ib_sge	rl_send_iov[4];	/* for active requests */
+	struct ib_sge	rl_iov;		/* for posting */
+	struct ib_mr	*rl_handle;	/* handle for mem in rl_iov */
+	char		rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
+	__u32 		rl_xdr_buf[0];	/* start of returned rpc rq_buffer */
+};
+#define rpcr_to_rdmar(r) \
+	container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
+
+/*
+ * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for
+ * inline requests/replies, and client/server credits.
+ *
+ * One of these is associated with a transport instance
+ */
+struct rpcrdma_buffer {
+	spinlock_t	rb_lock;	/* protects indexes */
+	atomic_t	rb_credits;	/* most recent server credits */
+	unsigned long	rb_cwndscale;	/* cached framework rpc_cwndscale */
+	int		rb_max_requests;/* client max requests */
+	struct list_head rb_mws;	/* optional memory windows/fmrs */
+	int		rb_send_index;
+	struct rpcrdma_req	**rb_send_bufs;
+	int		rb_recv_index;
+	struct rpcrdma_rep	**rb_recv_bufs;
+	char		*rb_pool;
+};
+#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
+
+/*
+ * Internal structure for transport instance creation. This
+ * exists primarily for modularity.
+ *
+ * This data should be set with mount options
+ */
+struct rpcrdma_create_data_internal {
+	struct sockaddr_storage	addr;	/* RDMA server address */
+	unsigned int	max_requests;	/* max requests (slots) in flight */
+	unsigned int	rsize;		/* mount rsize - max read hdr+data */
+	unsigned int	wsize;		/* mount wsize - max write hdr+data */
+	unsigned int	inline_rsize;	/* max non-rdma read data payload */
+	unsigned int	inline_wsize;	/* max non-rdma write data payload */
+	unsigned int	padding;	/* non-rdma write header padding */
+};
+
+#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
+	(rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_rsize)
+
+#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
+	(rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_wsize)
+
+#define RPCRDMA_INLINE_PAD_VALUE(rq)\
+	rpcx_to_rdmad(rq->rq_task->tk_xprt).padding
+
+/*
+ * Statistics for RPCRDMA
+ */
+struct rpcrdma_stats {
+	unsigned long		read_chunk_count;
+	unsigned long		write_chunk_count;
+	unsigned long		reply_chunk_count;
+
+	unsigned long long	total_rdma_request;
+	unsigned long long	total_rdma_reply;
+
+	unsigned long long	pullup_copy_count;
+	unsigned long long	fixup_copy_count;
+	unsigned long		hardway_register_count;
+	unsigned long		failed_marshal_count;
+	unsigned long		bad_reply_count;
+};
+
+/*
+ * RPCRDMA transport -- encapsulates the structures above for
+ * integration with RPC.
+ *
+ * The contained structures are embedded, not pointers,
+ * for convenience. This structure need not be visible externally.
+ *
+ * It is allocated and initialized during mount, and released
+ * during unmount.
+ */
+struct rpcrdma_xprt {
+	struct rpc_xprt		xprt;
+	struct rpcrdma_ia	rx_ia;
+	struct rpcrdma_ep	rx_ep;
+	struct rpcrdma_buffer	rx_buf;
+	struct rpcrdma_create_data_internal rx_data;
+	struct delayed_work	rdma_connect;
+	struct rpcrdma_stats	rx_stats;
+};
+
+#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
+#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
+
+/*
+ * Interface Adapter calls - xprtrdma/verbs.c
+ */
+int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
+void rpcrdma_ia_close(struct rpcrdma_ia *);
+
+/*
+ * Endpoint calls - xprtrdma/verbs.c
+ */
+int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
+				struct rpcrdma_create_data_internal *);
+int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+
+int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
+				struct rpcrdma_req *);
+int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
+				struct rpcrdma_rep *);
+
+/*
+ * Buffer calls - xprtrdma/verbs.c
+ */
+int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *,
+				struct rpcrdma_ia *,
+				struct rpcrdma_create_data_internal *);
+void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+
+struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
+void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+
+int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int,
+				struct ib_mr **, struct ib_sge *);
+int rpcrdma_deregister_internal(struct rpcrdma_ia *,
+				struct ib_mr *, struct ib_sge *);
+
+int rpcrdma_register_external(struct rpcrdma_mr_seg *,
+				int, int, struct rpcrdma_xprt *);
+int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
+				struct rpcrdma_xprt *, void *);
+
+/*
+ * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
+ */
+void rpcrdma_conn_func(struct rpcrdma_ep *);
+void rpcrdma_reply_handler(struct rpcrdma_rep *);
+
+/*
+ * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
+ */
+int rpcrdma_marshal_req(struct rpc_rqst *);
+
+#endif				/* _LINUX_SUNRPC_XPRT_RDMA_H */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 282efd4..02298f5 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -13,10 +13,14 @@
  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
  *
  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
+ *
+ * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
+ *   <gilles.quillard@bull.net>
  */
 
 #include <linux/types.h>
 #include <linux/slab.h>
+#include <linux/module.h>
 #include <linux/capability.h>
 #include <linux/pagemap.h>
 #include <linux/errno.h>
@@ -28,6 +32,7 @@
 #include <linux/tcp.h>
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/xprtsock.h>
 #include <linux/file.h>
 
 #include <net/sock.h>
@@ -260,14 +265,29 @@
 #define TCP_RCV_COPY_XID	(1UL << 2)
 #define TCP_RCV_COPY_DATA	(1UL << 3)
 
-static void xs_format_peer_addresses(struct rpc_xprt *xprt)
+static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
 {
-	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
+	return (struct sockaddr *) &xprt->addr;
+}
+
+static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
+{
+	return (struct sockaddr_in *) &xprt->addr;
+}
+
+static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
+{
+	return (struct sockaddr_in6 *) &xprt->addr;
+}
+
+static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt)
+{
+	struct sockaddr_in *addr = xs_addr_in(xprt);
 	char *buf;
 
 	buf = kzalloc(20, GFP_KERNEL);
 	if (buf) {
-		snprintf(buf, 20, "%u.%u.%u.%u",
+		snprintf(buf, 20, NIPQUAD_FMT,
 				NIPQUAD(addr->sin_addr.s_addr));
 	}
 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
@@ -279,26 +299,123 @@
 	}
 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
 
-	if (xprt->prot == IPPROTO_UDP)
-		xprt->address_strings[RPC_DISPLAY_PROTO] = "udp";
-	else
-		xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp";
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf) {
+		if (xprt->prot == IPPROTO_UDP)
+			snprintf(buf, 8, "udp");
+		else
+			snprintf(buf, 8, "tcp");
+	}
+	xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
 
 	buf = kzalloc(48, GFP_KERNEL);
 	if (buf) {
-		snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s",
+		snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
 			NIPQUAD(addr->sin_addr.s_addr),
 			ntohs(addr->sin_port),
 			xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
 	}
 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
+
+	buf = kzalloc(10, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 10, "%02x%02x%02x%02x",
+				NIPQUAD(addr->sin_addr.s_addr));
+	}
+	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 8, "%4hx",
+				ntohs(addr->sin_port));
+	}
+	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
+
+	buf = kzalloc(30, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
+				NIPQUAD(addr->sin_addr.s_addr),
+				ntohs(addr->sin_port) >> 8,
+				ntohs(addr->sin_port) & 0xff);
+	}
+	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
+
+	xprt->address_strings[RPC_DISPLAY_NETID] =
+		kstrdup(xprt->prot == IPPROTO_UDP ?
+			RPCBIND_NETID_UDP : RPCBIND_NETID_TCP, GFP_KERNEL);
+}
+
+static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt)
+{
+	struct sockaddr_in6 *addr = xs_addr_in6(xprt);
+	char *buf;
+
+	buf = kzalloc(40, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 40, NIP6_FMT,
+				NIP6(addr->sin6_addr));
+	}
+	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 8, "%u",
+				ntohs(addr->sin6_port));
+	}
+	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf) {
+		if (xprt->prot == IPPROTO_UDP)
+			snprintf(buf, 8, "udp");
+		else
+			snprintf(buf, 8, "tcp");
+	}
+	xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
+
+	buf = kzalloc(64, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
+				NIP6(addr->sin6_addr),
+				ntohs(addr->sin6_port),
+				xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+	}
+	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
+
+	buf = kzalloc(36, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 36, NIP6_SEQFMT,
+				NIP6(addr->sin6_addr));
+	}
+	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 8, "%4hx",
+				ntohs(addr->sin6_port));
+	}
+	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
+
+	buf = kzalloc(50, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 50, NIP6_FMT".%u.%u",
+				NIP6(addr->sin6_addr),
+				ntohs(addr->sin6_port) >> 8,
+				ntohs(addr->sin6_port) & 0xff);
+	}
+	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
+
+	xprt->address_strings[RPC_DISPLAY_NETID] =
+		kstrdup(xprt->prot == IPPROTO_UDP ?
+			RPCBIND_NETID_UDP6 : RPCBIND_NETID_TCP6, GFP_KERNEL);
 }
 
 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 {
-	kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
-	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
-	kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
+	int i;
+
+	for (i = 0; i < RPC_DISPLAY_MAX; i++)
+		kfree(xprt->address_strings[i]);
 }
 
 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
@@ -463,19 +580,20 @@
 
 	req->rq_xtime = jiffies;
 	status = xs_sendpages(transport->sock,
-			      (struct sockaddr *) &xprt->addr,
+			      xs_addr(xprt),
 			      xprt->addrlen, xdr,
 			      req->rq_bytes_sent);
 
 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 			xdr->len - req->rq_bytes_sent, status);
 
-	if (likely(status >= (int) req->rq_slen))
-		return 0;
-
-	/* Still some bytes left; set up for a retry later. */
-	if (status > 0)
+	if (status >= 0) {
+		task->tk_bytes_sent += status;
+		if (status >= req->rq_slen)
+			return 0;
+		/* Still some bytes left; set up for a retry later. */
 		status = -EAGAIN;
+	}
 
 	switch (status) {
 	case -ENETUNREACH:
@@ -523,7 +641,8 @@
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
-	int status, retry = 0;
+	int status;
+	unsigned int retry = 0;
 
 	xs_encode_tcp_record_marker(&req->rq_snd_buf);
 
@@ -661,6 +780,7 @@
 	xs_free_peer_addresses(xprt);
 	kfree(xprt->slot);
 	kfree(xprt);
+	module_put(THIS_MODULE);
 }
 
 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
@@ -1139,14 +1259,23 @@
  */
 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
 {
-	struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;
+	struct sockaddr *addr = xs_addr(xprt);
 
 	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
 
-	sap->sin_port = htons(port);
+	switch (addr->sa_family) {
+	case AF_INET:
+		((struct sockaddr_in *)addr)->sin_port = htons(port);
+		break;
+	case AF_INET6:
+		((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
+		break;
+	default:
+		BUG();
+	}
 }
 
-static int xs_bind(struct sock_xprt *transport, struct socket *sock)
+static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
 {
 	struct sockaddr_in myaddr = {
 		.sin_family = AF_INET,
@@ -1174,8 +1303,42 @@
 		else
 			port--;
 	} while (err == -EADDRINUSE && port != transport->port);
-	dprintk("RPC:       xs_bind "NIPQUAD_FMT":%u: %s (%d)\n",
-		NIPQUAD(myaddr.sin_addr), port, err ? "failed" : "ok", err);
+	dprintk("RPC:       %s "NIPQUAD_FMT":%u: %s (%d)\n",
+			__FUNCTION__, NIPQUAD(myaddr.sin_addr),
+			port, err ? "failed" : "ok", err);
+	return err;
+}
+
+static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
+{
+	struct sockaddr_in6 myaddr = {
+		.sin6_family = AF_INET6,
+	};
+	struct sockaddr_in6 *sa;
+	int err;
+	unsigned short port = transport->port;
+
+	if (!transport->xprt.resvport)
+		port = 0;
+	sa = (struct sockaddr_in6 *)&transport->addr;
+	myaddr.sin6_addr = sa->sin6_addr;
+	do {
+		myaddr.sin6_port = htons(port);
+		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
+						sizeof(myaddr));
+		if (!transport->xprt.resvport)
+			break;
+		if (err == 0) {
+			transport->port = port;
+			break;
+		}
+		if (port <= xprt_min_resvport)
+			port = xprt_max_resvport;
+		else
+			port--;
+	} while (err == -EADDRINUSE && port != transport->port);
+	dprintk("RPC:       xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
+		NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
 	return err;
 }
 
@@ -1183,64 +1346,36 @@
 static struct lock_class_key xs_key[2];
 static struct lock_class_key xs_slock_key[2];
 
-static inline void xs_reclassify_socket(struct socket *sock)
+static inline void xs_reclassify_socket4(struct socket *sock)
 {
 	struct sock *sk = sock->sk;
+
 	BUG_ON(sock_owned_by_user(sk));
-	switch (sk->sk_family) {
-	case AF_INET:
-		sock_lock_init_class_and_name(sk, "slock-AF_INET-NFS",
-			&xs_slock_key[0], "sk_lock-AF_INET-NFS", &xs_key[0]);
-		break;
+	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
+		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
+}
 
-	case AF_INET6:
-		sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFS",
-			&xs_slock_key[1], "sk_lock-AF_INET6-NFS", &xs_key[1]);
-		break;
+static inline void xs_reclassify_socket6(struct socket *sock)
+{
+	struct sock *sk = sock->sk;
 
-	default:
-		BUG();
-	}
+	BUG_ON(sock_owned_by_user(sk));
+	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
+		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
 }
 #else
-static inline void xs_reclassify_socket(struct socket *sock)
+static inline void xs_reclassify_socket4(struct socket *sock)
+{
+}
+
+static inline void xs_reclassify_socket6(struct socket *sock)
 {
 }
 #endif
 
-/**
- * xs_udp_connect_worker - set up a UDP socket
- * @work: RPC transport to connect
- *
- * Invoked by a work queue tasklet.
- */
-static void xs_udp_connect_worker(struct work_struct *work)
+static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
-	struct sock_xprt *transport =
-		container_of(work, struct sock_xprt, connect_worker.work);
-	struct rpc_xprt *xprt = &transport->xprt;
-	struct socket *sock = transport->sock;
-	int err, status = -EIO;
-
-	if (xprt->shutdown || !xprt_bound(xprt))
-		goto out;
-
-	/* Start by resetting any existing state */
-	xs_close(xprt);
-
-	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
-		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
-		goto out;
-	}
-	xs_reclassify_socket(sock);
-
-	if (xs_bind(transport, sock)) {
-		sock_release(sock);
-		goto out;
-	}
-
-	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
-			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 
 	if (!transport->inet) {
 		struct sock *sk = sock->sk;
@@ -1265,6 +1400,84 @@
 		write_unlock_bh(&sk->sk_callback_lock);
 	}
 	xs_udp_do_set_buffer_size(xprt);
+}
+
+/**
+ * xs_udp_connect_worker4 - set up a UDP socket
+ * @work: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_udp_connect_worker4(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, connect_worker.work);
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct socket *sock = transport->sock;
+	int err, status = -EIO;
+
+	if (xprt->shutdown || !xprt_bound(xprt))
+		goto out;
+
+	/* Start by resetting any existing state */
+	xs_close(xprt);
+
+	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
+		goto out;
+	}
+	xs_reclassify_socket4(sock);
+
+	if (xs_bind4(transport, sock)) {
+		sock_release(sock);
+		goto out;
+	}
+
+	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
+			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+
+	xs_udp_finish_connecting(xprt, sock);
+	status = 0;
+out:
+	xprt_wake_pending_tasks(xprt, status);
+	xprt_clear_connecting(xprt);
+}
+
+/**
+ * xs_udp_connect_worker6 - set up a UDP socket
+ * @work: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_udp_connect_worker6(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, connect_worker.work);
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct socket *sock = transport->sock;
+	int err, status = -EIO;
+
+	if (xprt->shutdown || !xprt_bound(xprt))
+		goto out;
+
+	/* Start by resetting any existing state */
+	xs_close(xprt);
+
+	if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
+		goto out;
+	}
+	xs_reclassify_socket6(sock);
+
+	if (xs_bind6(transport, sock) < 0) {
+		sock_release(sock);
+		goto out;
+	}
+
+	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
+			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+
+	xs_udp_finish_connecting(xprt, sock);
 	status = 0;
 out:
 	xprt_wake_pending_tasks(xprt, status);
@@ -1295,42 +1508,9 @@
 				result);
 }
 
-/**
- * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
- * @work: RPC transport to connect
- *
- * Invoked by a work queue tasklet.
- */
-static void xs_tcp_connect_worker(struct work_struct *work)
+static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
-	struct sock_xprt *transport =
-		container_of(work, struct sock_xprt, connect_worker.work);
-	struct rpc_xprt *xprt = &transport->xprt;
-	struct socket *sock = transport->sock;
-	int err, status = -EIO;
-
-	if (xprt->shutdown || !xprt_bound(xprt))
-		goto out;
-
-	if (!sock) {
-		/* start from scratch */
-		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
-			dprintk("RPC:       can't create TCP transport "
-					"socket (%d).\n", -err);
-			goto out;
-		}
-		xs_reclassify_socket(sock);
-
-		if (xs_bind(transport, sock)) {
-			sock_release(sock);
-			goto out;
-		}
-	} else
-		/* "close" the socket, preserving the local port */
-		xs_tcp_reuse_connection(xprt);
-
-	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
-			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 
 	if (!transport->inet) {
 		struct sock *sk = sock->sk;
@@ -1364,8 +1544,46 @@
 	/* Tell the socket layer to start connecting... */
 	xprt->stat.connect_count++;
 	xprt->stat.connect_start = jiffies;
-	status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
-			xprt->addrlen, O_NONBLOCK);
+	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
+}
+
+/**
+ * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
+ * @work: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_tcp_connect_worker4(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, connect_worker.work);
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct socket *sock = transport->sock;
+	int err, status = -EIO;
+
+	if (xprt->shutdown || !xprt_bound(xprt))
+		goto out;
+
+	if (!sock) {
+		/* start from scratch */
+		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
+			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
+			goto out;
+		}
+		xs_reclassify_socket4(sock);
+
+		if (xs_bind4(transport, sock) < 0) {
+			sock_release(sock);
+			goto out;
+		}
+	} else
+		/* "close" the socket, preserving the local port */
+		xs_tcp_reuse_connection(xprt);
+
+	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
+			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+
+	status = xs_tcp_finish_connecting(xprt, sock);
 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
 			xprt, -status, xprt_connected(xprt),
 			sock->sk->sk_state);
@@ -1391,6 +1609,66 @@
 }
 
 /**
+ * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
+ * @work: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_tcp_connect_worker6(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, connect_worker.work);
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct socket *sock = transport->sock;
+	int err, status = -EIO;
+
+	if (xprt->shutdown || !xprt_bound(xprt))
+		goto out;
+
+	if (!sock) {
+		/* start from scratch */
+		if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
+			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
+			goto out;
+		}
+		xs_reclassify_socket6(sock);
+
+		if (xs_bind6(transport, sock) < 0) {
+			sock_release(sock);
+			goto out;
+		}
+	} else
+		/* "close" the socket, preserving the local port */
+		xs_tcp_reuse_connection(xprt);
+
+	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
+			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
+
+	status = xs_tcp_finish_connecting(xprt, sock);
+	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
+			xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
+	if (status < 0) {
+		switch (status) {
+			case -EINPROGRESS:
+			case -EALREADY:
+				goto out_clear;
+			case -ECONNREFUSED:
+			case -ECONNRESET:
+				/* retry with existing socket, after a delay */
+				break;
+			default:
+				/* get rid of existing socket, and retry */
+				xs_close(xprt);
+				break;
+		}
+	}
+out:
+	xprt_wake_pending_tasks(xprt, status);
+out_clear:
+	xprt_clear_connecting(xprt);
+}
+
+/**
  * xs_connect - connect a socket to a remote endpoint
  * @task: address of RPC task that manages state of connect request
  *
@@ -1508,7 +1786,8 @@
 	.print_stats		= xs_tcp_print_stats,
 };
 
-static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned int slot_table_size)
+static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
+				      unsigned int slot_table_size)
 {
 	struct rpc_xprt *xprt;
 	struct sock_xprt *new;
@@ -1549,8 +1828,9 @@
  * @args: rpc transport creation arguments
  *
  */
-struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
+struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
 {
+	struct sockaddr *addr = args->dstaddr;
 	struct rpc_xprt *xprt;
 	struct sock_xprt *transport;
 
@@ -1559,15 +1839,11 @@
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
 
-	if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
-		xprt_set_bound(xprt);
-
 	xprt->prot = IPPROTO_UDP;
 	xprt->tsh_size = 0;
 	/* XXX: header size can vary due to auth type, IPv6, etc. */
 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
-	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker);
 	xprt->bind_timeout = XS_BIND_TO;
 	xprt->connect_timeout = XS_UDP_CONN_TO;
 	xprt->reestablish_timeout = XS_UDP_REEST_TO;
@@ -1580,11 +1856,37 @@
 	else
 		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
 
-	xs_format_peer_addresses(xprt);
+	switch (addr->sa_family) {
+	case AF_INET:
+		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
+			xprt_set_bound(xprt);
+
+		INIT_DELAYED_WORK(&transport->connect_worker,
+					xs_udp_connect_worker4);
+		xs_format_ipv4_peer_addresses(xprt);
+		break;
+	case AF_INET6:
+		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
+			xprt_set_bound(xprt);
+
+		INIT_DELAYED_WORK(&transport->connect_worker,
+					xs_udp_connect_worker6);
+		xs_format_ipv6_peer_addresses(xprt);
+		break;
+	default:
+		kfree(xprt);
+		return ERR_PTR(-EAFNOSUPPORT);
+	}
+
 	dprintk("RPC:       set up transport to address %s\n",
 			xprt->address_strings[RPC_DISPLAY_ALL]);
 
-	return xprt;
+	if (try_module_get(THIS_MODULE))
+		return xprt;
+
+	kfree(xprt->slot);
+	kfree(xprt);
+	return ERR_PTR(-EINVAL);
 }
 
 /**
@@ -1592,8 +1894,9 @@
  * @args: rpc transport creation arguments
  *
  */
-struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
+struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
 {
+	struct sockaddr *addr = args->dstaddr;
 	struct rpc_xprt *xprt;
 	struct sock_xprt *transport;
 
@@ -1602,14 +1905,10 @@
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
 
-	if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
-		xprt_set_bound(xprt);
-
 	xprt->prot = IPPROTO_TCP;
 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
-	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker);
 	xprt->bind_timeout = XS_BIND_TO;
 	xprt->connect_timeout = XS_TCP_CONN_TO;
 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
@@ -1622,15 +1921,55 @@
 	else
 		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
 
-	xs_format_peer_addresses(xprt);
+	switch (addr->sa_family) {
+	case AF_INET:
+		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
+			xprt_set_bound(xprt);
+
+		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
+		xs_format_ipv4_peer_addresses(xprt);
+		break;
+	case AF_INET6:
+		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
+			xprt_set_bound(xprt);
+
+		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
+		xs_format_ipv6_peer_addresses(xprt);
+		break;
+	default:
+		kfree(xprt);
+		return ERR_PTR(-EAFNOSUPPORT);
+	}
+
 	dprintk("RPC:       set up transport to address %s\n",
 			xprt->address_strings[RPC_DISPLAY_ALL]);
 
-	return xprt;
+	if (try_module_get(THIS_MODULE))
+		return xprt;
+
+	kfree(xprt->slot);
+	kfree(xprt);
+	return ERR_PTR(-EINVAL);
 }
 
+static struct xprt_class	xs_udp_transport = {
+	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
+	.name		= "udp",
+	.owner		= THIS_MODULE,
+	.ident		= IPPROTO_UDP,
+	.setup		= xs_setup_udp,
+};
+
+static struct xprt_class	xs_tcp_transport = {
+	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
+	.name		= "tcp",
+	.owner		= THIS_MODULE,
+	.ident		= IPPROTO_TCP,
+	.setup		= xs_setup_tcp,
+};
+
 /**
- * init_socket_xprt - set up xprtsock's sysctls
+ * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
  *
  */
 int init_socket_xprt(void)
@@ -1640,11 +1979,14 @@
 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
 #endif
 
+	xprt_register_transport(&xs_udp_transport);
+	xprt_register_transport(&xs_tcp_transport);
+
 	return 0;
 }
 
 /**
- * cleanup_socket_xprt - remove xprtsock's sysctls
+ * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
  *
  */
 void cleanup_socket_xprt(void)
@@ -1655,4 +1997,7 @@
 		sunrpc_table_header = NULL;
 	}
 #endif
+
+	xprt_unregister_transport(&xs_udp_transport);
+	xprt_unregister_transport(&xs_tcp_transport);
 }