sgi-xp: setup the activate GRU message queue

Setup the activate GRU message queue that is used for partition activation
and channel connection on UV systems.

Signed-off-by: Dean Nelson <dcn@sgi.com>
Cc: Jack Steiner <steiner@sgi.com>
Cc: "Luck, Tony" <tony.luck@intel.com>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
diff --git a/drivers/misc/sgi-xp/xpc_main.c b/drivers/misc/sgi-xp/xpc_main.c
index b303c13..13ec479 100644
--- a/drivers/misc/sgi-xp/xpc_main.c
+++ b/drivers/misc/sgi-xp/xpc_main.c
@@ -141,8 +141,9 @@
 /* non-zero if any remote partition disengage was timed out */
 int xpc_disengage_timedout;
 
-/* #of activate IRQs received */
-atomic_t xpc_activate_IRQ_rcvd = ATOMIC_INIT(0);
+/* #of activate IRQs received and not yet processed */
+int xpc_activate_IRQ_rcvd;
+DEFINE_SPINLOCK(xpc_activate_IRQ_rcvd_lock);
 
 /* IRQ handler notifies this wait queue on receipt of an IRQ */
 DECLARE_WAIT_QUEUE_HEAD(xpc_activate_IRQ_wq);
@@ -169,10 +170,11 @@
 	.notifier_call = xpc_system_die,
 };
 
+int (*xpc_setup_partitions_sn) (void);
 enum xp_retval (*xpc_get_partition_rsvd_page_pa) (void *buf, u64 *cookie,
 						  unsigned long *rp_pa,
 						  size_t *len);
-enum xp_retval (*xpc_rsvd_page_init) (struct xpc_rsvd_page *rp);
+int (*xpc_setup_rsvd_page_sn) (struct xpc_rsvd_page *rp);
 void (*xpc_heartbeat_init) (void);
 void (*xpc_heartbeat_exit) (void);
 void (*xpc_increment_heartbeat) (void);
@@ -183,8 +185,8 @@
 enum xp_retval (*xpc_make_first_contact) (struct xpc_partition *part);
 void (*xpc_notify_senders_of_disconnect) (struct xpc_channel *ch);
 u64 (*xpc_get_chctl_all_flags) (struct xpc_partition *part);
-enum xp_retval (*xpc_allocate_msgqueues) (struct xpc_channel *ch);
-void (*xpc_free_msgqueues) (struct xpc_channel *ch);
+enum xp_retval (*xpc_setup_msg_structures) (struct xpc_channel *ch);
+void (*xpc_teardown_msg_structures) (struct xpc_channel *ch);
 void (*xpc_process_msg_chctl_flags) (struct xpc_partition *part, int ch_number);
 int (*xpc_n_of_deliverable_msgs) (struct xpc_channel *ch);
 struct xpc_msg *(*xpc_get_deliverable_msg) (struct xpc_channel *ch);
@@ -196,9 +198,9 @@
 void (*xpc_request_partition_deactivation) (struct xpc_partition *part);
 void (*xpc_cancel_partition_deactivation_request) (struct xpc_partition *part);
 
-void (*xpc_process_activate_IRQ_rcvd) (int n_IRQs_expected);
-enum xp_retval (*xpc_setup_infrastructure) (struct xpc_partition *part);
-void (*xpc_teardown_infrastructure) (struct xpc_partition *part);
+void (*xpc_process_activate_IRQ_rcvd) (void);
+enum xp_retval (*xpc_setup_ch_structures_sn) (struct xpc_partition *part);
+void (*xpc_teardown_ch_structures_sn) (struct xpc_partition *part);
 
 void (*xpc_indicate_partition_engaged) (struct xpc_partition *part);
 int (*xpc_partition_engaged) (short partid);
@@ -215,6 +217,9 @@
 void (*xpc_send_chctl_openreply) (struct xpc_channel *ch,
 				  unsigned long *irq_flags);
 
+void (*xpc_save_remote_msgqueue_pa) (struct xpc_channel *ch,
+				     unsigned long msgqueue_pa);
+
 enum xp_retval (*xpc_send_msg) (struct xpc_channel *ch, u32 flags,
 				void *payload, u16 payload_size, u8 notify_type,
 				xpc_notify_func func, void *key);
@@ -308,8 +313,6 @@
 static int
 xpc_hb_checker(void *ignore)
 {
-	int last_IRQ_count = 0;
-	int new_IRQ_count;
 	int force_IRQ = 0;
 
 	/* this thread was marked active by xpc_hb_init() */
@@ -325,43 +328,37 @@
 		dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have "
 			"been received\n",
 			(int)(xpc_hb_check_timeout - jiffies),
-			atomic_read(&xpc_activate_IRQ_rcvd) - last_IRQ_count);
+			xpc_activate_IRQ_rcvd);
 
 		/* checking of remote heartbeats is skewed by IRQ handling */
 		if (time_is_before_eq_jiffies(xpc_hb_check_timeout)) {
+			xpc_hb_check_timeout = jiffies +
+			    (xpc_hb_check_interval * HZ);
+
 			dev_dbg(xpc_part, "checking remote heartbeats\n");
 			xpc_check_remote_hb();
 
 			/*
-			 * We need to periodically recheck to ensure no
-			 * IRQ/amo pairs have been missed.  That check
-			 * must always reset xpc_hb_check_timeout.
+			 * On sn2 we need to periodically recheck to ensure no
+			 * IRQ/amo pairs have been missed.
 			 */
-			force_IRQ = 1;
+			if (is_shub())
+				force_IRQ = 1;
 		}
 
 		/* check for outstanding IRQs */
-		new_IRQ_count = atomic_read(&xpc_activate_IRQ_rcvd);
-		if (last_IRQ_count < new_IRQ_count || force_IRQ != 0) {
+		if (xpc_activate_IRQ_rcvd > 0 || force_IRQ != 0) {
 			force_IRQ = 0;
-
-			dev_dbg(xpc_part, "found an IRQ to process; will be "
-				"resetting xpc_hb_check_timeout\n");
-
-			xpc_process_activate_IRQ_rcvd(new_IRQ_count -
-						      last_IRQ_count);
-			last_IRQ_count = new_IRQ_count;
-
-			xpc_hb_check_timeout = jiffies +
-			    (xpc_hb_check_interval * HZ);
+			dev_dbg(xpc_part, "processing activate IRQs "
+				"received\n");
+			xpc_process_activate_IRQ_rcvd();
 		}
 
 		/* wait for IRQ or timeout */
 		(void)wait_event_interruptible(xpc_activate_IRQ_wq,
-					       (last_IRQ_count < atomic_read(
-						&xpc_activate_IRQ_rcvd)
-						|| time_is_before_eq_jiffies(
+					       (time_is_before_eq_jiffies(
 						xpc_hb_check_timeout) ||
+						xpc_activate_IRQ_rcvd > 0 ||
 						xpc_exiting));
 	}
 
@@ -437,6 +434,153 @@
 }
 
 /*
+ * Guarantee that the kzalloc'd memory is cacheline aligned.
+ */
+void *
+xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
+{
+	/* see if kzalloc will give us cachline aligned memory by default */
+	*base = kzalloc(size, flags);
+	if (*base == NULL)
+		return NULL;
+
+	if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
+		return *base;
+
+	kfree(*base);
+
+	/* nope, we'll have to do it ourselves */
+	*base = kzalloc(size + L1_CACHE_BYTES, flags);
+	if (*base == NULL)
+		return NULL;
+
+	return (void *)L1_CACHE_ALIGN((u64)*base);
+}
+
+/*
+ * Setup the channel structures necessary to support XPartition Communication
+ * between the specified remote partition and the local one.
+ */
+static enum xp_retval
+xpc_setup_ch_structures(struct xpc_partition *part)
+{
+	enum xp_retval ret;
+	int ch_number;
+	struct xpc_channel *ch;
+	short partid = XPC_PARTID(part);
+
+	/*
+	 * Allocate all of the channel structures as a contiguous chunk of
+	 * memory.
+	 */
+	DBUG_ON(part->channels != NULL);
+	part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_MAX_NCHANNELS,
+				 GFP_KERNEL);
+	if (part->channels == NULL) {
+		dev_err(xpc_chan, "can't get memory for channels\n");
+		return xpNoMemory;
+	}
+
+	/* allocate the remote open and close args */
+
+	part->remote_openclose_args =
+	    xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE,
+					  GFP_KERNEL, &part->
+					  remote_openclose_args_base);
+	if (part->remote_openclose_args == NULL) {
+		dev_err(xpc_chan, "can't get memory for remote connect args\n");
+		ret = xpNoMemory;
+		goto out_1;
+	}
+
+	part->chctl.all_flags = 0;
+	spin_lock_init(&part->chctl_lock);
+
+	atomic_set(&part->channel_mgr_requests, 1);
+	init_waitqueue_head(&part->channel_mgr_wq);
+
+	part->nchannels = XPC_MAX_NCHANNELS;
+
+	atomic_set(&part->nchannels_active, 0);
+	atomic_set(&part->nchannels_engaged, 0);
+
+	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
+		ch = &part->channels[ch_number];
+
+		ch->partid = partid;
+		ch->number = ch_number;
+		ch->flags = XPC_C_DISCONNECTED;
+
+		atomic_set(&ch->kthreads_assigned, 0);
+		atomic_set(&ch->kthreads_idle, 0);
+		atomic_set(&ch->kthreads_active, 0);
+
+		atomic_set(&ch->references, 0);
+		atomic_set(&ch->n_to_notify, 0);
+
+		spin_lock_init(&ch->lock);
+		init_completion(&ch->wdisconnect_wait);
+
+		atomic_set(&ch->n_on_msg_allocate_wq, 0);
+		init_waitqueue_head(&ch->msg_allocate_wq);
+		init_waitqueue_head(&ch->idle_wq);
+	}
+
+	ret = xpc_setup_ch_structures_sn(part);
+	if (ret != xpSuccess)
+		goto out_2;
+
+	/*
+	 * With the setting of the partition setup_state to XPC_P_SS_SETUP,
+	 * we're declaring that this partition is ready to go.
+	 */
+	part->setup_state = XPC_P_SS_SETUP;
+
+	return xpSuccess;
+
+	/* setup of ch structures failed */
+out_2:
+	kfree(part->remote_openclose_args_base);
+	part->remote_openclose_args = NULL;
+out_1:
+	kfree(part->channels);
+	part->channels = NULL;
+	return ret;
+}
+
+/*
+ * Teardown the channel structures necessary to support XPartition Communication
+ * between the specified remote partition and the local one.
+ */
+static void
+xpc_teardown_ch_structures(struct xpc_partition *part)
+{
+	DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
+	DBUG_ON(atomic_read(&part->nchannels_active) != 0);
+
+	/*
+	 * Make this partition inaccessible to local processes by marking it
+	 * as no longer setup. Then wait before proceeding with the teardown
+	 * until all existing references cease.
+	 */
+	DBUG_ON(part->setup_state != XPC_P_SS_SETUP);
+	part->setup_state = XPC_P_SS_WTEARDOWN;
+
+	wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));
+
+	/* now we can begin tearing down the infrastructure */
+
+	xpc_teardown_ch_structures_sn(part);
+
+	kfree(part->remote_openclose_args_base);
+	part->remote_openclose_args = NULL;
+	kfree(part->channels);
+	part->channels = NULL;
+
+	part->setup_state = XPC_P_SS_TORNDOWN;
+}
+
+/*
  * When XPC HB determines that a partition has come up, it will create a new
  * kthread and that kthread will call this function to attempt to set up the
  * basic infrastructure used for Cross Partition Communication with the newly
@@ -476,7 +620,7 @@
 
 	xpc_allow_hb(partid);
 
-	if (xpc_setup_infrastructure(part) == xpSuccess) {
+	if (xpc_setup_ch_structures(part) == xpSuccess) {
 		(void)xpc_part_ref(part);	/* this will always succeed */
 
 		if (xpc_make_first_contact(part) == xpSuccess) {
@@ -486,7 +630,7 @@
 		}
 
 		xpc_part_deref(part);
-		xpc_teardown_infrastructure(part);
+		xpc_teardown_ch_structures(part);
 	}
 
 	xpc_disallow_hb(partid);
@@ -806,6 +950,56 @@
 	}
 }
 
+static int
+xpc_setup_partitions(void)
+{
+	short partid;
+	struct xpc_partition *part;
+
+	xpc_partitions = kzalloc(sizeof(struct xpc_partition) *
+				 xp_max_npartitions, GFP_KERNEL);
+	if (xpc_partitions == NULL) {
+		dev_err(xpc_part, "can't get memory for partition structure\n");
+		return -ENOMEM;
+	}
+
+	/*
+	 * The first few fields of each entry of xpc_partitions[] need to
+	 * be initialized now so that calls to xpc_connect() and
+	 * xpc_disconnect() can be made prior to the activation of any remote
+	 * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
+	 * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
+	 * PARTITION HAS BEEN ACTIVATED.
+	 */
+	for (partid = 0; partid < xp_max_npartitions; partid++) {
+		part = &xpc_partitions[partid];
+
+		DBUG_ON((u64)part != L1_CACHE_ALIGN((u64)part));
+
+		part->activate_IRQ_rcvd = 0;
+		spin_lock_init(&part->act_lock);
+		part->act_state = XPC_P_AS_INACTIVE;
+		XPC_SET_REASON(part, 0, 0);
+
+		init_timer(&part->disengage_timer);
+		part->disengage_timer.function =
+		    xpc_timeout_partition_disengage;
+		part->disengage_timer.data = (unsigned long)part;
+
+		part->setup_state = XPC_P_SS_UNSET;
+		init_waitqueue_head(&part->teardown_wq);
+		atomic_set(&part->references, 0);
+	}
+
+	return xpc_setup_partitions_sn();
+}
+
+static void
+xpc_teardown_partitions(void)
+{
+	kfree(xpc_partitions);
+}
+
 static void
 xpc_do_exit(enum xp_retval reason)
 {
@@ -892,8 +1086,7 @@
 	DBUG_ON(xpc_any_partition_engaged());
 	DBUG_ON(xpc_any_hbs_allowed() != 0);
 
-	/* a zero timestamp indicates our rsvd page is not initialized */
-	xpc_rsvd_page->ts_jiffies = 0;
+	xpc_teardown_rsvd_page();
 
 	if (reason == xpUnloading) {
 		(void)unregister_die_notifier(&xpc_die_notifier);
@@ -906,7 +1099,7 @@
 	if (xpc_sysctl)
 		unregister_sysctl_table(xpc_sysctl);
 
-	kfree(xpc_partitions);
+	xpc_teardown_partitions();
 
 	if (is_shub())
 		xpc_exit_sn2();
@@ -1062,8 +1255,6 @@
 xpc_init(void)
 {
 	int ret;
-	short partid;
-	struct xpc_partition *part;
 	struct task_struct *kthread;
 
 	snprintf(xpc_part->bus_id, BUS_ID_SIZE, "part");
@@ -1076,56 +1267,29 @@
 		 * further to only support exactly 64 partitions on this
 		 * architecture, no less.
 		 */
-		if (xp_max_npartitions != 64)
-			return -EINVAL;
-
-		ret = xpc_init_sn2();
-		if (ret != 0)
-			return ret;
+		if (xp_max_npartitions != 64) {
+			dev_err(xpc_part, "max #of partitions not set to 64\n");
+			ret = -EINVAL;
+		} else {
+			ret = xpc_init_sn2();
+		}
 
 	} else if (is_uv()) {
-		xpc_init_uv();
+		ret = xpc_init_uv();
 
 	} else {
-		return -ENODEV;
+		ret = -ENODEV;
 	}
 
-	xpc_partitions = kzalloc(sizeof(struct xpc_partition) *
-				 xp_max_npartitions, GFP_KERNEL);
-	if (xpc_partitions == NULL) {
+	if (ret != 0)
+		return ret;
+
+	ret = xpc_setup_partitions();
+	if (ret != 0) {
 		dev_err(xpc_part, "can't get memory for partition structure\n");
-		ret = -ENOMEM;
 		goto out_1;
 	}
 
-	/*
-	 * The first few fields of each entry of xpc_partitions[] need to
-	 * be initialized now so that calls to xpc_connect() and
-	 * xpc_disconnect() can be made prior to the activation of any remote
-	 * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
-	 * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
-	 * PARTITION HAS BEEN ACTIVATED.
-	 */
-	for (partid = 0; partid < xp_max_npartitions; partid++) {
-		part = &xpc_partitions[partid];
-
-		DBUG_ON((u64)part != L1_CACHE_ALIGN((u64)part));
-
-		part->activate_IRQ_rcvd = 0;
-		spin_lock_init(&part->act_lock);
-		part->act_state = XPC_P_AS_INACTIVE;
-		XPC_SET_REASON(part, 0, 0);
-
-		init_timer(&part->disengage_timer);
-		part->disengage_timer.function =
-		    xpc_timeout_partition_disengage;
-		part->disengage_timer.data = (unsigned long)part;
-
-		part->setup_state = XPC_P_SS_UNSET;
-		init_waitqueue_head(&part->teardown_wq);
-		atomic_set(&part->references, 0);
-	}
-
 	xpc_sysctl = register_sysctl_table(xpc_sys_dir);
 
 	/*
@@ -1133,10 +1297,9 @@
 	 * other partitions to discover we are alive and establish initial
 	 * communications.
 	 */
-	xpc_rsvd_page = xpc_setup_rsvd_page();
-	if (xpc_rsvd_page == NULL) {
+	ret = xpc_setup_rsvd_page();
+	if (ret != 0) {
 		dev_err(xpc_part, "can't setup our reserved page\n");
-		ret = -EBUSY;
 		goto out_2;
 	}
 
@@ -1187,15 +1350,15 @@
 
 	/* initialization was not successful */
 out_3:
-	/* a zero timestamp indicates our rsvd page is not initialized */
-	xpc_rsvd_page->ts_jiffies = 0;
+	xpc_teardown_rsvd_page();
 
 	(void)unregister_die_notifier(&xpc_die_notifier);
 	(void)unregister_reboot_notifier(&xpc_reboot_notifier);
 out_2:
 	if (xpc_sysctl)
 		unregister_sysctl_table(xpc_sysctl);
-	kfree(xpc_partitions);
+
+	xpc_teardown_partitions();
 out_1:
 	if (is_shub())
 		xpc_exit_sn2();