Merge branch 'globalheartbeat-2' of git://oss.oracle.com/git/smushran/linux-2.6 into ocfs2-merge-window

Conflicts:
	fs/ocfs2/ocfs2.h
diff --git a/fs/ocfs2/acl.c b/fs/ocfs2/acl.c
index a76e0aa..3919150 100644
--- a/fs/ocfs2/acl.c
+++ b/fs/ocfs2/acl.c
@@ -209,7 +209,10 @@
 	}
 
 	inode->i_mode = new_mode;
+	inode->i_ctime = CURRENT_TIME;
 	di->i_mode = cpu_to_le16(inode->i_mode);
+	di->i_ctime = cpu_to_le64(inode->i_ctime.tv_sec);
+	di->i_ctime_nsec = cpu_to_le32(inode->i_ctime.tv_nsec);
 
 	ocfs2_journal_dirty(handle, di_bh);
 
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 41d5f1f..52c7557 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -62,10 +62,51 @@
 static LIST_HEAD(o2hb_node_events);
 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
 
+/*
+ * In global heartbeat, we maintain a series of region bitmaps.
+ * 	- o2hb_region_bitmap allows us to limit the region number to max region.
+ * 	- o2hb_live_region_bitmap tracks live regions (seen steady iterations).
+ * 	- o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
+ * 		heartbeat on it.
+ * 	- o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
+ */
+static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
+
+#define O2HB_DB_TYPE_LIVENODES		0
+#define O2HB_DB_TYPE_LIVEREGIONS	1
+#define O2HB_DB_TYPE_QUORUMREGIONS	2
+#define O2HB_DB_TYPE_FAILEDREGIONS	3
+#define O2HB_DB_TYPE_REGION_LIVENODES	4
+#define O2HB_DB_TYPE_REGION_NUMBER	5
+#define O2HB_DB_TYPE_REGION_ELAPSED_TIME	6
+struct o2hb_debug_buf {
+	int db_type;
+	int db_size;
+	int db_len;
+	void *db_data;
+};
+
+static struct o2hb_debug_buf *o2hb_db_livenodes;
+static struct o2hb_debug_buf *o2hb_db_liveregions;
+static struct o2hb_debug_buf *o2hb_db_quorumregions;
+static struct o2hb_debug_buf *o2hb_db_failedregions;
+
 #define O2HB_DEBUG_DIR			"o2hb"
 #define O2HB_DEBUG_LIVENODES		"livenodes"
+#define O2HB_DEBUG_LIVEREGIONS		"live_regions"
+#define O2HB_DEBUG_QUORUMREGIONS	"quorum_regions"
+#define O2HB_DEBUG_FAILEDREGIONS	"failed_regions"
+#define O2HB_DEBUG_REGION_NUMBER	"num"
+#define O2HB_DEBUG_REGION_ELAPSED_TIME	"elapsed_time_in_ms"
+
 static struct dentry *o2hb_debug_dir;
 static struct dentry *o2hb_debug_livenodes;
+static struct dentry *o2hb_debug_liveregions;
+static struct dentry *o2hb_debug_quorumregions;
+static struct dentry *o2hb_debug_failedregions;
 
 static LIST_HEAD(o2hb_all_regions);
 
@@ -77,7 +118,19 @@
 
 #define O2HB_DEFAULT_BLOCK_BITS       9
 
+enum o2hb_heartbeat_modes {
+	O2HB_HEARTBEAT_LOCAL		= 0,
+	O2HB_HEARTBEAT_GLOBAL,
+	O2HB_HEARTBEAT_NUM_MODES,
+};
+
+char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
+		"local",	/* O2HB_HEARTBEAT_LOCAL */
+		"global",	/* O2HB_HEARTBEAT_GLOBAL */
+};
+
 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
+unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
 
 /* Only sets a new threshold if there are no active regions.
  *
@@ -94,6 +147,22 @@
 	}
 }
 
+static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode)
+{
+	int ret = -1;
+
+	if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
+		spin_lock(&o2hb_live_lock);
+		if (list_empty(&o2hb_all_regions)) {
+			o2hb_heartbeat_mode = hb_mode;
+			ret = 0;
+		}
+		spin_unlock(&o2hb_live_lock);
+	}
+
+	return ret;
+}
+
 struct o2hb_node_event {
 	struct list_head        hn_item;
 	enum o2hb_callback_type hn_event_type;
@@ -135,6 +204,18 @@
 	struct block_device	*hr_bdev;
 	struct o2hb_disk_slot	*hr_slots;
 
+	/* live node map of this region */
+	unsigned long		hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
+	unsigned int		hr_region_num;
+
+	struct dentry		*hr_debug_dir;
+	struct dentry		*hr_debug_livenodes;
+	struct dentry		*hr_debug_regnum;
+	struct dentry		*hr_debug_elapsed_time;
+	struct o2hb_debug_buf	*hr_db_livenodes;
+	struct o2hb_debug_buf	*hr_db_regnum;
+	struct o2hb_debug_buf	*hr_db_elapsed_time;
+
 	/* let the person setting up hb wait for it to return until it
 	 * has reached a 'steady' state.  This will be fixed when we have
 	 * a more complete api that doesn't lead to this sort of fragility. */
@@ -163,8 +244,19 @@
 	int               wc_error;
 };
 
+static int o2hb_pop_count(void *map, int count)
+{
+	int i = -1, pop = 0;
+
+	while ((i = find_next_bit(map, count, i + 1)) < count)
+		pop++;
+	return pop;
+}
+
 static void o2hb_write_timeout(struct work_struct *work)
 {
+	int failed, quorum;
+	unsigned long flags;
 	struct o2hb_region *reg =
 		container_of(work, struct o2hb_region,
 			     hr_write_timeout_work.work);
@@ -172,6 +264,28 @@
 	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 	     "milliseconds\n", reg->hr_dev_name,
 	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
+
+	if (o2hb_global_heartbeat_active()) {
+		spin_lock_irqsave(&o2hb_live_lock, flags);
+		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
+			set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
+		failed = o2hb_pop_count(&o2hb_failed_region_bitmap,
+					O2NM_MAX_REGIONS);
+		quorum = o2hb_pop_count(&o2hb_quorum_region_bitmap,
+					O2NM_MAX_REGIONS);
+		spin_unlock_irqrestore(&o2hb_live_lock, flags);
+
+		mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
+		     quorum, failed);
+
+		/*
+		 * Fence if the number of failed regions >= half the number
+		 * of  quorum regions
+		 */
+		if ((failed << 1) < quorum)
+			return;
+	}
+
 	o2quo_disk_timeout();
 }
 
@@ -180,6 +294,11 @@
 	mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
 	     O2HB_MAX_WRITE_TIMEOUT_MS);
 
+	if (o2hb_global_heartbeat_active()) {
+		spin_lock(&o2hb_live_lock);
+		clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
+		spin_unlock(&o2hb_live_lock);
+	}
 	cancel_delayed_work(&reg->hr_write_timeout_work);
 	reg->hr_last_timeout_start = jiffies;
 	schedule_delayed_work(&reg->hr_write_timeout_work,
@@ -513,6 +632,8 @@
 {
 	assert_spin_locked(&o2hb_live_lock);
 
+	BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
+
 	event->hn_event_type = type;
 	event->hn_node = node;
 	event->hn_node_num = node_num;
@@ -554,6 +675,35 @@
 	o2nm_node_put(node);
 }
 
+static void o2hb_set_quorum_device(struct o2hb_region *reg,
+				   struct o2hb_disk_slot *slot)
+{
+	assert_spin_locked(&o2hb_live_lock);
+
+	if (!o2hb_global_heartbeat_active())
+		return;
+
+	if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
+		return;
+
+	/*
+	 * A region can be added to the quorum only when it sees all
+	 * live nodes heartbeat on it. In other words, the region has been
+	 * added to all nodes.
+	 */
+	if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
+		   sizeof(o2hb_live_node_bitmap)))
+		return;
+
+	if (slot->ds_changed_samples < O2HB_LIVE_THRESHOLD)
+		return;
+
+	printk(KERN_NOTICE "o2hb: Region %s is now a quorum device\n",
+	       config_item_name(&reg->hr_item));
+
+	set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
+}
+
 static int o2hb_check_slot(struct o2hb_region *reg,
 			   struct o2hb_disk_slot *slot)
 {
@@ -565,14 +715,22 @@
 	u64 cputime;
 	unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 	unsigned int slot_dead_ms;
+	int tmp;
 
 	memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 
-	/* Is this correct? Do we assume that the node doesn't exist
-	 * if we're not configured for him? */
+	/*
+	 * If a node is no longer configured but is still in the livemap, we
+	 * may need to clear that bit from the livemap.
+	 */
 	node = o2nm_get_node_by_num(slot->ds_node_num);
-	if (!node)
-		return 0;
+	if (!node) {
+		spin_lock(&o2hb_live_lock);
+		tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
+		spin_unlock(&o2hb_live_lock);
+		if (!tmp)
+			return 0;
+	}
 
 	if (!o2hb_verify_crc(reg, hb_block)) {
 		/* all paths from here will drop o2hb_live_lock for
@@ -639,8 +797,12 @@
 		mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 		     slot->ds_node_num, (long long)slot->ds_last_generation);
 
+		set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
+
 		/* first on the list generates a callback */
 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
+			mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
+			     "bitmap\n", slot->ds_node_num);
 			set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
 			o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
@@ -684,13 +846,18 @@
 		mlog(ML_HEARTBEAT, "Node %d left my region\n",
 		     slot->ds_node_num);
 
+		clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
+
 		/* last off the live_slot generates a callback */
 		list_del_init(&slot->ds_live_item);
 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
+			mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
+			     "nodes bitmap\n", slot->ds_node_num);
 			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
-			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
-					      slot->ds_node_num);
+			/* node can be null */
+			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
+					      node, slot->ds_node_num);
 
 			changed = 1;
 		}
@@ -706,11 +873,14 @@
 		slot->ds_equal_samples = 0;
 	}
 out:
+	o2hb_set_quorum_device(reg, slot);
+
 	spin_unlock(&o2hb_live_lock);
 
 	o2hb_run_event_list(&event);
 
-	o2nm_node_put(node);
+	if (node)
+		o2nm_node_put(node);
 	return changed;
 }
 
@@ -737,6 +907,7 @@
 {
 	int i, ret, highest_node, change = 0;
 	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
+	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	struct o2hb_bio_wait_ctxt write_wc;
 
 	ret = o2nm_configured_node_map(configured_nodes,
@@ -746,6 +917,17 @@
 		return ret;
 	}
 
+	/*
+	 * If a node is not configured but is in the livemap, we still need
+	 * to read the slot so as to be able to remove it from the livemap.
+	 */
+	o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
+	i = -1;
+	while ((i = find_next_bit(live_node_bitmap,
+				  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
+		set_bit(i, configured_nodes);
+	}
+
 	highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
 	if (highest_node >= O2NM_MAX_NODES) {
 		mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
@@ -917,21 +1099,59 @@
 #ifdef CONFIG_DEBUG_FS
 static int o2hb_debug_open(struct inode *inode, struct file *file)
 {
+	struct o2hb_debug_buf *db = inode->i_private;
+	struct o2hb_region *reg;
 	unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
 	char *buf = NULL;
 	int i = -1;
 	int out = 0;
 
+	/* max_nodes should be the largest bitmap we pass here */
+	BUG_ON(sizeof(map) < db->db_size);
+
 	buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
 	if (!buf)
 		goto bail;
 
-	o2hb_fill_node_map(map, sizeof(map));
+	switch (db->db_type) {
+	case O2HB_DB_TYPE_LIVENODES:
+	case O2HB_DB_TYPE_LIVEREGIONS:
+	case O2HB_DB_TYPE_QUORUMREGIONS:
+	case O2HB_DB_TYPE_FAILEDREGIONS:
+		spin_lock(&o2hb_live_lock);
+		memcpy(map, db->db_data, db->db_size);
+		spin_unlock(&o2hb_live_lock);
+		break;
 
-	while ((i = find_next_bit(map, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES)
+	case O2HB_DB_TYPE_REGION_LIVENODES:
+		spin_lock(&o2hb_live_lock);
+		reg = (struct o2hb_region *)db->db_data;
+		memcpy(map, reg->hr_live_node_bitmap, db->db_size);
+		spin_unlock(&o2hb_live_lock);
+		break;
+
+	case O2HB_DB_TYPE_REGION_NUMBER:
+		reg = (struct o2hb_region *)db->db_data;
+		out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
+				reg->hr_region_num);
+		goto done;
+
+	case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
+		reg = (struct o2hb_region *)db->db_data;
+		out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
+				jiffies_to_msecs(jiffies -
+						 reg->hr_last_timeout_start));
+		goto done;
+
+	default:
+		goto done;
+	}
+
+	while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
 		out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
 	out += snprintf(buf + out, PAGE_SIZE - out, "\n");
 
+done:
 	i_size_write(inode, out);
 
 	file->private_data = buf;
@@ -978,10 +1198,104 @@
 
 void o2hb_exit(void)
 {
-	if (o2hb_debug_livenodes)
-		debugfs_remove(o2hb_debug_livenodes);
-	if (o2hb_debug_dir)
-		debugfs_remove(o2hb_debug_dir);
+	kfree(o2hb_db_livenodes);
+	kfree(o2hb_db_liveregions);
+	kfree(o2hb_db_quorumregions);
+	kfree(o2hb_db_failedregions);
+	debugfs_remove(o2hb_debug_failedregions);
+	debugfs_remove(o2hb_debug_quorumregions);
+	debugfs_remove(o2hb_debug_liveregions);
+	debugfs_remove(o2hb_debug_livenodes);
+	debugfs_remove(o2hb_debug_dir);
+}
+
+static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
+					struct o2hb_debug_buf **db, int db_len,
+					int type, int size, int len, void *data)
+{
+	*db = kmalloc(db_len, GFP_KERNEL);
+	if (!*db)
+		return NULL;
+
+	(*db)->db_type = type;
+	(*db)->db_size = size;
+	(*db)->db_len = len;
+	(*db)->db_data = data;
+
+	return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
+				   &o2hb_debug_fops);
+}
+
+static int o2hb_debug_init(void)
+{
+	int ret = -ENOMEM;
+
+	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
+	if (!o2hb_debug_dir) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
+						 o2hb_debug_dir,
+						 &o2hb_db_livenodes,
+						 sizeof(*o2hb_db_livenodes),
+						 O2HB_DB_TYPE_LIVENODES,
+						 sizeof(o2hb_live_node_bitmap),
+						 O2NM_MAX_NODES,
+						 o2hb_live_node_bitmap);
+	if (!o2hb_debug_livenodes) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
+						   o2hb_debug_dir,
+						   &o2hb_db_liveregions,
+						   sizeof(*o2hb_db_liveregions),
+						   O2HB_DB_TYPE_LIVEREGIONS,
+						   sizeof(o2hb_live_region_bitmap),
+						   O2NM_MAX_REGIONS,
+						   o2hb_live_region_bitmap);
+	if (!o2hb_debug_liveregions) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_quorumregions =
+			o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
+					  o2hb_debug_dir,
+					  &o2hb_db_quorumregions,
+					  sizeof(*o2hb_db_quorumregions),
+					  O2HB_DB_TYPE_QUORUMREGIONS,
+					  sizeof(o2hb_quorum_region_bitmap),
+					  O2NM_MAX_REGIONS,
+					  o2hb_quorum_region_bitmap);
+	if (!o2hb_debug_quorumregions) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	o2hb_debug_failedregions =
+			o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
+					  o2hb_debug_dir,
+					  &o2hb_db_failedregions,
+					  sizeof(*o2hb_db_failedregions),
+					  O2HB_DB_TYPE_FAILEDREGIONS,
+					  sizeof(o2hb_failed_region_bitmap),
+					  O2NM_MAX_REGIONS,
+					  o2hb_failed_region_bitmap);
+	if (!o2hb_debug_failedregions) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	ret = 0;
+bail:
+	if (ret)
+		o2hb_exit();
+
+	return ret;
 }
 
 int o2hb_init(void)
@@ -997,24 +1311,12 @@
 	INIT_LIST_HEAD(&o2hb_node_events);
 
 	memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
+	memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
+	memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
+	memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
+	memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
 
-	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
-	if (!o2hb_debug_dir) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
-	}
-
-	o2hb_debug_livenodes = debugfs_create_file(O2HB_DEBUG_LIVENODES,
-						   S_IFREG|S_IRUSR,
-						   o2hb_debug_dir, NULL,
-						   &o2hb_debug_fops);
-	if (!o2hb_debug_livenodes) {
-		mlog_errno(-ENOMEM);
-		debugfs_remove(o2hb_debug_dir);
-		return -ENOMEM;
-	}
-
-	return 0;
+	return o2hb_debug_init();
 }
 
 /* if we're already in a callback then we're already serialized by the sem */
@@ -1078,6 +1380,13 @@
 	if (reg->hr_slots)
 		kfree(reg->hr_slots);
 
+	kfree(reg->hr_db_regnum);
+	kfree(reg->hr_db_livenodes);
+	debugfs_remove(reg->hr_debug_livenodes);
+	debugfs_remove(reg->hr_debug_regnum);
+	debugfs_remove(reg->hr_debug_elapsed_time);
+	debugfs_remove(reg->hr_debug_dir);
+
 	spin_lock(&o2hb_live_lock);
 	list_del(&reg->hr_all_item);
 	spin_unlock(&o2hb_live_lock);
@@ -1441,6 +1750,8 @@
 	/* Ok, we were woken.  Make sure it wasn't by drop_item() */
 	spin_lock(&o2hb_live_lock);
 	hb_task = reg->hr_task;
+	if (o2hb_global_heartbeat_active())
+		set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
 	spin_unlock(&o2hb_live_lock);
 
 	if (hb_task)
@@ -1448,6 +1759,10 @@
 	else
 		ret = -EIO;
 
+	if (hb_task && o2hb_global_heartbeat_active())
+		printk(KERN_NOTICE "o2hb: Heartbeat started on region %s\n",
+		       config_item_name(&reg->hr_item));
+
 out:
 	if (filp)
 		fput(filp);
@@ -1586,21 +1901,94 @@
 		: NULL;
 }
 
+static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
+{
+	int ret = -ENOMEM;
+
+	reg->hr_debug_dir =
+		debugfs_create_dir(config_item_name(&reg->hr_item), dir);
+	if (!reg->hr_debug_dir) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	reg->hr_debug_livenodes =
+			o2hb_debug_create(O2HB_DEBUG_LIVENODES,
+					  reg->hr_debug_dir,
+					  &(reg->hr_db_livenodes),
+					  sizeof(*(reg->hr_db_livenodes)),
+					  O2HB_DB_TYPE_REGION_LIVENODES,
+					  sizeof(reg->hr_live_node_bitmap),
+					  O2NM_MAX_NODES, reg);
+	if (!reg->hr_debug_livenodes) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	reg->hr_debug_regnum =
+			o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
+					  reg->hr_debug_dir,
+					  &(reg->hr_db_regnum),
+					  sizeof(*(reg->hr_db_regnum)),
+					  O2HB_DB_TYPE_REGION_NUMBER,
+					  0, O2NM_MAX_NODES, reg);
+	if (!reg->hr_debug_regnum) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	reg->hr_debug_elapsed_time =
+			o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
+					  reg->hr_debug_dir,
+					  &(reg->hr_db_elapsed_time),
+					  sizeof(*(reg->hr_db_elapsed_time)),
+					  O2HB_DB_TYPE_REGION_ELAPSED_TIME,
+					  0, 0, reg);
+	if (!reg->hr_debug_elapsed_time) {
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	ret = 0;
+bail:
+	return ret;
+}
+
 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
 							  const char *name)
 {
 	struct o2hb_region *reg = NULL;
+	int ret;
 
 	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
 	if (reg == NULL)
 		return ERR_PTR(-ENOMEM);
 
-	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
+	if (strlen(name) > O2HB_MAX_REGION_NAME_LEN)
+		return ERR_PTR(-ENAMETOOLONG);
 
 	spin_lock(&o2hb_live_lock);
+	reg->hr_region_num = 0;
+	if (o2hb_global_heartbeat_active()) {
+		reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
+							 O2NM_MAX_REGIONS);
+		if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
+			spin_unlock(&o2hb_live_lock);
+			return ERR_PTR(-EFBIG);
+		}
+		set_bit(reg->hr_region_num, o2hb_region_bitmap);
+	}
 	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
 	spin_unlock(&o2hb_live_lock);
 
+	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
+
+	ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
+	if (ret) {
+		config_item_put(&reg->hr_item);
+		return ERR_PTR(ret);
+	}
+
 	return &reg->hr_item;
 }
 
@@ -1612,6 +2000,10 @@
 
 	/* stop the thread when the user removes the region dir */
 	spin_lock(&o2hb_live_lock);
+	if (o2hb_global_heartbeat_active()) {
+		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
+		clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
+	}
 	hb_task = reg->hr_task;
 	reg->hr_task = NULL;
 	spin_unlock(&o2hb_live_lock);
@@ -1628,6 +2020,9 @@
 		wake_up(&o2hb_steady_queue);
 	}
 
+	if (o2hb_global_heartbeat_active())
+		printk(KERN_NOTICE "o2hb: Heartbeat stopped on region %s\n",
+		       config_item_name(&reg->hr_item));
 	config_item_put(item);
 }
 
@@ -1688,6 +2083,41 @@
 	return count;
 }
 
+static
+ssize_t o2hb_heartbeat_group_mode_show(struct o2hb_heartbeat_group *group,
+				       char *page)
+{
+	return sprintf(page, "%s\n",
+		       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
+}
+
+static
+ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group,
+					const char *page, size_t count)
+{
+	unsigned int i;
+	int ret;
+	size_t len;
+
+	len = (page[count - 1] == '\n') ? count - 1 : count;
+	if (!len)
+		return -EINVAL;
+
+	for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
+		if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len))
+			continue;
+
+		ret = o2hb_global_hearbeat_mode_set(i);
+		if (!ret)
+			printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
+			       o2hb_heartbeat_mode_desc[i]);
+		return count;
+	}
+
+	return -EINVAL;
+
+}
+
 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
 	.attr	= { .ca_owner = THIS_MODULE,
 		    .ca_name = "dead_threshold",
@@ -1696,8 +2126,17 @@
 	.store	= o2hb_heartbeat_group_threshold_store,
 };
 
+static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_mode = {
+	.attr   = { .ca_owner = THIS_MODULE,
+		.ca_name = "mode",
+		.ca_mode = S_IRUGO | S_IWUSR },
+	.show   = o2hb_heartbeat_group_mode_show,
+	.store  = o2hb_heartbeat_group_mode_store,
+};
+
 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
 	&o2hb_heartbeat_group_attr_threshold.attr,
+	&o2hb_heartbeat_group_attr_mode.attr,
 	NULL,
 };
 
@@ -1963,3 +2402,34 @@
 	spin_unlock(&o2hb_live_lock);
 }
 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
+
+int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
+{
+	struct o2hb_region *reg;
+	int numregs = 0;
+	char *p;
+
+	spin_lock(&o2hb_live_lock);
+
+	p = region_uuids;
+	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
+		mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
+		if (numregs < max_regions) {
+			memcpy(p, config_item_name(&reg->hr_item),
+			       O2HB_MAX_REGION_NAME_LEN);
+			p += O2HB_MAX_REGION_NAME_LEN;
+		}
+		numregs++;
+	}
+
+	spin_unlock(&o2hb_live_lock);
+
+	return numregs;
+}
+EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
+
+int o2hb_global_heartbeat_active(void)
+{
+	return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
+}
+EXPORT_SYMBOL(o2hb_global_heartbeat_active);
diff --git a/fs/ocfs2/cluster/heartbeat.h b/fs/ocfs2/cluster/heartbeat.h
index 2f16492..00ad8e8 100644
--- a/fs/ocfs2/cluster/heartbeat.h
+++ b/fs/ocfs2/cluster/heartbeat.h
@@ -31,6 +31,8 @@
 
 #define O2HB_REGION_TIMEOUT_MS		2000
 
+#define O2HB_MAX_REGION_NAME_LEN	32
+
 /* number of changes to be seen as live */
 #define O2HB_LIVE_THRESHOLD	   2
 /* number of equal samples to be seen as dead */
@@ -81,5 +83,7 @@
 int o2hb_check_node_heartbeating_from_callback(u8 node_num);
 int o2hb_check_local_node_heartbeating(void);
 void o2hb_stop_all_regions(void);
+int o2hb_get_all_regions(char *region_uuids, u8 numregions);
+int o2hb_global_heartbeat_active(void);
 
 #endif /* O2CLUSTER_HEARTBEAT_H */
diff --git a/fs/ocfs2/cluster/masklog.h b/fs/ocfs2/cluster/masklog.h
index fd96e2a..ea2ed9f 100644
--- a/fs/ocfs2/cluster/masklog.h
+++ b/fs/ocfs2/cluster/masklog.h
@@ -119,7 +119,8 @@
 #define ML_ERROR	0x0000000100000000ULL /* sent to KERN_ERR */
 #define ML_NOTICE	0x0000000200000000ULL /* setn to KERN_NOTICE */
 #define ML_KTHREAD	0x0000000400000000ULL /* kernel thread activity */
-#define	ML_RESERVATIONS	0x0000000800000000ULL /* ocfs2 alloc reservations */
+#define ML_RESERVATIONS	0x0000000800000000ULL /* ocfs2 alloc reservations */
+#define ML_CLUSTER	0x0000001000000000ULL /* cluster stack */
 
 #define MLOG_INITIAL_AND_MASK (ML_ERROR|ML_NOTICE)
 #define MLOG_INITIAL_NOT_MASK (ML_ENTRY|ML_EXIT)
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index ed0c9f3..bb24064 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -711,6 +711,8 @@
 	config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
 	spin_lock_init(&node->nd_lock);
 
+	mlog(ML_CLUSTER, "o2nm: Registering node %s\n", name);
+
 	return &node->nd_item;
 }
 
@@ -744,6 +746,9 @@
 	}
 	write_unlock(&cluster->cl_nodes_lock);
 
+	mlog(ML_CLUSTER, "o2nm: Unregistered node %s\n",
+	     config_item_name(&node->nd_item));
+
 	config_item_put(item);
 }
 
diff --git a/fs/ocfs2/cluster/ocfs2_nodemanager.h b/fs/ocfs2/cluster/ocfs2_nodemanager.h
index 5b9854b..49b5943 100644
--- a/fs/ocfs2/cluster/ocfs2_nodemanager.h
+++ b/fs/ocfs2/cluster/ocfs2_nodemanager.h
@@ -36,4 +36,10 @@
 /* host name, group name, cluster name all 64 bytes */
 #define O2NM_MAX_NAME_LEN        64    // __NEW_UTS_LEN
 
+/*
+ * Maximum number of global heartbeat regions allowed.
+ * **CAUTION**  Changing this number will break dlm compatibility.
+ */
+#define O2NM_MAX_REGIONS	32
+
 #endif /* _OCFS2_NODEMANAGER_H */
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index 1361997..9aa426e 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -977,7 +977,7 @@
 int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
 			   size_t caller_veclen, u8 target_node, int *status)
 {
-	int ret;
+	int ret = 0;
 	struct o2net_msg *msg = NULL;
 	size_t veclen, caller_bytes = 0;
 	struct kvec *vec = NULL;
@@ -1696,6 +1696,9 @@
 {
 	o2quo_hb_down(node_num);
 
+	if (!node)
+		return;
+
 	if (node_num != o2nm_this_node())
 		o2net_disconnect_node(node);
 
@@ -1709,6 +1712,8 @@
 
 	o2quo_hb_up(node_num);
 
+	BUG_ON(!node);
+
 	/* ensure an immediate connect attempt */
 	nn->nn_last_connect_attempt = jiffies -
 		(msecs_to_jiffies(o2net_reconnect_delay()) + 1);
diff --git a/fs/ocfs2/dir.c b/fs/ocfs2/dir.c
index f04ebcf..c49f6de 100644
--- a/fs/ocfs2/dir.c
+++ b/fs/ocfs2/dir.c
@@ -3931,6 +3931,15 @@
 		goto out_commit;
 	}
 
+	cpos = split_hash;
+	ret = ocfs2_dx_dir_new_cluster(dir, &et, cpos, handle,
+				       data_ac, meta_ac, new_dx_leaves,
+				       num_dx_leaves);
+	if (ret) {
+		mlog_errno(ret);
+		goto out_commit;
+	}
+
 	for (i = 0; i < num_dx_leaves; i++) {
 		ret = ocfs2_journal_access_dl(handle, INODE_CACHE(dir),
 					      orig_dx_leaves[i],
@@ -3939,15 +3948,14 @@
 			mlog_errno(ret);
 			goto out_commit;
 		}
-	}
 
-	cpos = split_hash;
-	ret = ocfs2_dx_dir_new_cluster(dir, &et, cpos, handle,
-				       data_ac, meta_ac, new_dx_leaves,
-				       num_dx_leaves);
-	if (ret) {
-		mlog_errno(ret);
-		goto out_commit;
+		ret = ocfs2_journal_access_dl(handle, INODE_CACHE(dir),
+					      new_dx_leaves[i],
+					      OCFS2_JOURNAL_ACCESS_WRITE);
+		if (ret) {
+			mlog_errno(ret);
+			goto out_commit;
+		}
 	}
 
 	ocfs2_dx_dir_transfer_leaf(dir, split_hash, handle, tmp_dx_leaf,
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 4b6ae2c..b36d0bf 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -445,7 +445,9 @@
 	DLM_LOCK_REQUEST_MSG,	 /* 515 */
 	DLM_RECO_DATA_DONE_MSG,	 /* 516 */
 	DLM_BEGIN_RECO_MSG,	 /* 517 */
-	DLM_FINALIZE_RECO_MSG	 /* 518 */
+	DLM_FINALIZE_RECO_MSG,	 /* 518 */
+	DLM_QUERY_REGION,	 /* 519 */
+	DLM_QUERY_NODEINFO,	 /* 520 */
 };
 
 struct dlm_reco_node_data
@@ -727,6 +729,31 @@
 	u8 domain[O2NM_MAX_NAME_LEN];
 };
 
+struct dlm_query_region {
+	u8 qr_node;
+	u8 qr_numregions;
+	u8 qr_namelen;
+	u8 pad1;
+	u8 qr_domain[O2NM_MAX_NAME_LEN];
+	u8 qr_regions[O2HB_MAX_REGION_NAME_LEN * O2NM_MAX_REGIONS];
+};
+
+struct dlm_node_info {
+	u8 ni_nodenum;
+	u8 pad1;
+	u16 ni_ipv4_port;
+	u32 ni_ipv4_address;
+};
+
+struct dlm_query_nodeinfo {
+	u8 qn_nodenum;
+	u8 qn_numnodes;
+	u8 qn_namelen;
+	u8 pad1;
+	u8 qn_domain[O2NM_MAX_NAME_LEN];
+	struct dlm_node_info qn_nodes[O2NM_MAX_NODES];
+};
+
 struct dlm_exit_domain
 {
 	u8 node_idx;
@@ -1030,6 +1057,7 @@
 			 struct dlm_lock_resource *res);
 void dlm_clean_master_list(struct dlm_ctxt *dlm,
 			   u8 dead_node);
+void dlm_force_free_mles(struct dlm_ctxt *dlm);
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
 int __dlm_lockres_unused(struct dlm_lock_resource *res);
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 26ff2e1..272ec86 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -636,8 +636,14 @@
 	spin_lock(&dlm->track_lock);
 	if (oldres)
 		track_list = &oldres->tracking;
-	else
+	else {
 		track_list = &dlm->tracking_list;
+		if (list_empty(track_list)) {
+			dl = NULL;
+			spin_unlock(&dlm->track_lock);
+			goto bail;
+		}
+	}
 
 	list_for_each_entry(res, track_list, tracking) {
 		if (&res->tracking == &dlm->tracking_list)
@@ -660,6 +666,7 @@
 	} else
 		dl = NULL;
 
+bail:
 	/* passed to seq_show */
 	return dl;
 }
@@ -775,7 +782,9 @@
 
 	/* Domain: xxxxxxxxxx  Key: 0xdfbac769 */
 	out += snprintf(db->buf + out, db->len - out,
-			"Domain: %s  Key: 0x%08x\n", dlm->name, dlm->key);
+			"Domain: %s  Key: 0x%08x  Protocol: %d.%d\n",
+			dlm->name, dlm->key, dlm->dlm_locking_proto.pv_major,
+			dlm->dlm_locking_proto.pv_minor);
 
 	/* Thread Pid: xxx  Node: xxx  State: xxxxx */
 	out += snprintf(db->buf + out, db->len - out,
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 153abb5..58a93b9 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -128,10 +128,14 @@
  * will have a negotiated version with the same major number and a minor
  * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
  * be used to determine what a running domain is actually using.
+ *
+ * New in version 1.1:
+ *	- Message DLM_QUERY_REGION added to support global heartbeat
+ *	- Message DLM_QUERY_NODEINFO added to allow online node removes
  */
 static const struct dlm_protocol_version dlm_protocol = {
 	.pv_major = 1,
-	.pv_minor = 0,
+	.pv_minor = 1,
 };
 
 #define DLM_DOMAIN_BACKOFF_MS 200
@@ -142,6 +146,8 @@
 				     void **ret_data);
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data);
+static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
+				    void *data, void **ret_data);
 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data);
 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
@@ -693,6 +699,7 @@
 
 		dlm_mark_domain_leaving(dlm);
 		dlm_leave_domain(dlm);
+		dlm_force_free_mles(dlm);
 		dlm_complete_dlm_shutdown(dlm);
 	}
 	dlm_put(dlm);
@@ -920,6 +927,370 @@
 	return 0;
 }
 
+static int dlm_match_regions(struct dlm_ctxt *dlm,
+			     struct dlm_query_region *qr)
+{
+	char *local = NULL, *remote = qr->qr_regions;
+	char *l, *r;
+	int localnr, i, j, foundit;
+	int status = 0;
+
+	if (!o2hb_global_heartbeat_active()) {
+		if (qr->qr_numregions) {
+			mlog(ML_ERROR, "Domain %s: Joining node %d has global "
+			     "heartbeat enabled but local node %d does not\n",
+			     qr->qr_domain, qr->qr_node, dlm->node_num);
+			status = -EINVAL;
+		}
+		goto bail;
+	}
+
+	if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
+		mlog(ML_ERROR, "Domain %s: Local node %d has global "
+		     "heartbeat enabled but joining node %d does not\n",
+		     qr->qr_domain, dlm->node_num, qr->qr_node);
+		status = -EINVAL;
+		goto bail;
+	}
+
+	r = remote;
+	for (i = 0; i < qr->qr_numregions; ++i) {
+		mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
+		r += O2HB_MAX_REGION_NAME_LEN;
+	}
+
+	local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
+	if (!local) {
+		status = -ENOMEM;
+		goto bail;
+	}
+
+	localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
+
+	/* compare local regions with remote */
+	l = local;
+	for (i = 0; i < localnr; ++i) {
+		foundit = 0;
+		r = remote;
+		for (j = 0; j <= qr->qr_numregions; ++j) {
+			if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
+				foundit = 1;
+				break;
+			}
+			r += O2HB_MAX_REGION_NAME_LEN;
+		}
+		if (!foundit) {
+			status = -EINVAL;
+			mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
+			     "in local node %d but not in joining node %d\n",
+			     qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
+			     dlm->node_num, qr->qr_node);
+			goto bail;
+		}
+		l += O2HB_MAX_REGION_NAME_LEN;
+	}
+
+	/* compare remote with local regions */
+	r = remote;
+	for (i = 0; i < qr->qr_numregions; ++i) {
+		foundit = 0;
+		l = local;
+		for (j = 0; j < localnr; ++j) {
+			if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
+				foundit = 1;
+				break;
+			}
+			l += O2HB_MAX_REGION_NAME_LEN;
+		}
+		if (!foundit) {
+			status = -EINVAL;
+			mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
+			     "in joining node %d but not in local node %d\n",
+			     qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
+			     qr->qr_node, dlm->node_num);
+			goto bail;
+		}
+		r += O2HB_MAX_REGION_NAME_LEN;
+	}
+
+bail:
+	kfree(local);
+
+	return status;
+}
+
+static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+	struct dlm_query_region *qr = NULL;
+	int status, ret = 0, i;
+	char *p;
+
+	if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+		goto bail;
+
+	qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
+	if (!qr) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	qr->qr_node = dlm->node_num;
+	qr->qr_namelen = strlen(dlm->name);
+	memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
+	/* if local hb, the numregions will be zero */
+	if (o2hb_global_heartbeat_active())
+		qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
+							 O2NM_MAX_REGIONS);
+
+	p = qr->qr_regions;
+	for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
+		mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
+
+	i = -1;
+	while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+				  i + 1)) < O2NM_MAX_NODES) {
+		if (i == dlm->node_num)
+			continue;
+
+		mlog(0, "Sending regions to node %d\n", i);
+
+		ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
+					 sizeof(struct dlm_query_region),
+					 i, &status);
+		if (ret >= 0)
+			ret = status;
+		if (ret) {
+			mlog(ML_ERROR, "Region mismatch %d, node %d\n",
+			     ret, i);
+			break;
+		}
+	}
+
+bail:
+	kfree(qr);
+	return ret;
+}
+
+static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
+				    void *data, void **ret_data)
+{
+	struct dlm_query_region *qr;
+	struct dlm_ctxt *dlm = NULL;
+	int status = 0;
+	int locked = 0;
+
+	qr = (struct dlm_query_region *) msg->buf;
+
+	mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
+	     qr->qr_domain);
+
+	status = -EINVAL;
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
+	if (!dlm) {
+		mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+		     "before join domain\n", qr->qr_node, qr->qr_domain);
+		goto bail;
+	}
+
+	spin_lock(&dlm->spinlock);
+	locked = 1;
+	if (dlm->joining_node != qr->qr_node) {
+		mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+		     "but joining node is %d\n", qr->qr_node, qr->qr_domain,
+		     dlm->joining_node);
+		goto bail;
+	}
+
+	/* Support for global heartbeat was added in 1.1 */
+	if (dlm->dlm_locking_proto.pv_major == 1 &&
+	    dlm->dlm_locking_proto.pv_minor == 0) {
+		mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
+		     "but active dlm protocol is %d.%d\n", qr->qr_node,
+		     qr->qr_domain, dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor);
+		goto bail;
+	}
+
+	status = dlm_match_regions(dlm, qr);
+
+bail:
+	if (locked)
+		spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm_domain_lock);
+
+	return status;
+}
+
+static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
+{
+	struct o2nm_node *local;
+	struct dlm_node_info *remote;
+	int i, j;
+	int status = 0;
+
+	for (j = 0; j < qn->qn_numnodes; ++j)
+		mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
+		     &(qn->qn_nodes[j].ni_ipv4_address),
+		     ntohs(qn->qn_nodes[j].ni_ipv4_port));
+
+	for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
+		local = o2nm_get_node_by_num(i);
+		remote = NULL;
+		for (j = 0; j < qn->qn_numnodes; ++j) {
+			if (qn->qn_nodes[j].ni_nodenum == i) {
+				remote = &(qn->qn_nodes[j]);
+				break;
+			}
+		}
+
+		if (!local && !remote)
+			continue;
+
+		if ((local && !remote) || (!local && remote))
+			status = -EINVAL;
+
+		if (!status &&
+		    ((remote->ni_nodenum != local->nd_num) ||
+		     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
+		     (remote->ni_ipv4_address != local->nd_ipv4_address)))
+			status = -EINVAL;
+
+		if (status) {
+			if (remote && !local)
+				mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+				     "registered in joining node %d but not in "
+				     "local node %d\n", qn->qn_domain,
+				     remote->ni_nodenum,
+				     &(remote->ni_ipv4_address),
+				     ntohs(remote->ni_ipv4_port),
+				     qn->qn_nodenum, dlm->node_num);
+			if (local && !remote)
+				mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+				     "registered in local node %d but not in "
+				     "joining node %d\n", qn->qn_domain,
+				     local->nd_num, &(local->nd_ipv4_address),
+				     ntohs(local->nd_ipv4_port),
+				     dlm->node_num, qn->qn_nodenum);
+			BUG_ON((!local && !remote));
+		}
+
+		if (local)
+			o2nm_node_put(local);
+	}
+
+	return status;
+}
+
+static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+	struct dlm_query_nodeinfo *qn = NULL;
+	struct o2nm_node *node;
+	int ret = 0, status, count, i;
+
+	if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+		goto bail;
+
+	qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
+	if (!qn) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
+		node = o2nm_get_node_by_num(i);
+		if (!node)
+			continue;
+		qn->qn_nodes[count].ni_nodenum = node->nd_num;
+		qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
+		qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
+		mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
+		     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
+		++count;
+		o2nm_node_put(node);
+	}
+
+	qn->qn_nodenum = dlm->node_num;
+	qn->qn_numnodes = count;
+	qn->qn_namelen = strlen(dlm->name);
+	memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
+
+	i = -1;
+	while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+				  i + 1)) < O2NM_MAX_NODES) {
+		if (i == dlm->node_num)
+			continue;
+
+		mlog(0, "Sending nodeinfo to node %d\n", i);
+
+		ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+					 qn, sizeof(struct dlm_query_nodeinfo),
+					 i, &status);
+		if (ret >= 0)
+			ret = status;
+		if (ret) {
+			mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
+			break;
+		}
+	}
+
+bail:
+	kfree(qn);
+	return ret;
+}
+
+static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
+				      void *data, void **ret_data)
+{
+	struct dlm_query_nodeinfo *qn;
+	struct dlm_ctxt *dlm = NULL;
+	int locked = 0, status = -EINVAL;
+
+	qn = (struct dlm_query_nodeinfo *) msg->buf;
+
+	mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
+	     qn->qn_domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
+	if (!dlm) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
+		     "join domain\n", qn->qn_nodenum, qn->qn_domain);
+		goto bail;
+	}
+
+	spin_lock(&dlm->spinlock);
+	locked = 1;
+	if (dlm->joining_node != qn->qn_nodenum) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
+		     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
+		     dlm->joining_node);
+		goto bail;
+	}
+
+	/* Support for node query was added in 1.1 */
+	if (dlm->dlm_locking_proto.pv_major == 1 &&
+	    dlm->dlm_locking_proto.pv_minor == 0) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s "
+		     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
+		     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor);
+		goto bail;
+	}
+
+	status = dlm_match_nodes(dlm, qn);
+
+bail:
+	if (locked)
+		spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm_domain_lock);
+
+	return status;
+}
+
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data)
 {
@@ -1240,6 +1611,20 @@
 	set_bit(dlm->node_num, dlm->domain_map);
 	spin_unlock(&dlm->spinlock);
 
+	/* Support for global heartbeat and node info was added in 1.1 */
+	if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
+		status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
+		if (status) {
+			mlog_errno(status);
+			goto bail;
+		}
+		status = dlm_send_regions(dlm, ctxt->yes_resp_map);
+		if (status) {
+			mlog_errno(status);
+			goto bail;
+		}
+	}
+
 	dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
 
 	/* Joined state *must* be set before the joining node
@@ -1806,7 +2191,21 @@
 					sizeof(struct dlm_cancel_join),
 					dlm_cancel_join_handler,
 					NULL, NULL, &dlm_join_handlers);
+	if (status)
+		goto bail;
 
+	status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
+					sizeof(struct dlm_query_region),
+					dlm_query_region_handler,
+					NULL, NULL, &dlm_join_handlers);
+
+	if (status)
+		goto bail;
+
+	status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+					sizeof(struct dlm_query_nodeinfo),
+					dlm_query_nodeinfo_handler,
+					NULL, NULL, &dlm_join_handlers);
 bail:
 	if (status < 0)
 		dlm_unregister_net_handlers();
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index ffb4c68..f564b0e 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -3433,3 +3433,43 @@
 	wake_up(&res->wq);
 	wake_up(&dlm->migration_wq);
 }
+
+void dlm_force_free_mles(struct dlm_ctxt *dlm)
+{
+	int i;
+	struct hlist_head *bucket;
+	struct dlm_master_list_entry *mle;
+	struct hlist_node *tmp, *list;
+
+	/*
+	 * We notified all other nodes that we are exiting the domain and
+	 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
+	 * around we force free them and wake any processes that are waiting
+	 * on the mles
+	 */
+	spin_lock(&dlm->spinlock);
+	spin_lock(&dlm->master_lock);
+
+	BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
+	BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
+
+	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+		bucket = dlm_master_hash(dlm, i);
+		hlist_for_each_safe(list, tmp, bucket) {
+			mle = hlist_entry(list, struct dlm_master_list_entry,
+					  master_hash_node);
+			if (mle->type != DLM_MLE_BLOCK) {
+				mlog(ML_ERROR, "bad mle: %p\n", mle);
+				dlm_print_one_mle(mle);
+			}
+			atomic_set(&mle->woken, 1);
+			wake_up(&mle->wq);
+
+			__dlm_unlink_mle(dlm, mle);
+			__dlm_mle_detach_hb_events(dlm, mle);
+			__dlm_put_mle(mle);
+		}
+	}
+	spin_unlock(&dlm->master_lock);
+	spin_unlock(&dlm->spinlock);
+}
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index d1ce48e..1d596d8 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -84,6 +84,7 @@
 	OI_LS_PARENT,
 	OI_LS_RENAME1,
 	OI_LS_RENAME2,
+	OI_LS_REFLINK_TARGET,
 };
 
 int ocfs2_dlm_init(struct ocfs2_super *osb);
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 3064feef..d840821 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -250,7 +250,7 @@
 
 enum ocfs2_mount_options
 {
-	OCFS2_MOUNT_HB_LOCAL   = 1 << 0, /* Heartbeat started in local mode */
+	OCFS2_MOUNT_HB_LOCAL = 1 << 0, /* Local heartbeat */
 	OCFS2_MOUNT_BARRIER = 1 << 1,	/* Use block barriers */
 	OCFS2_MOUNT_NOINTR  = 1 << 2,   /* Don't catch signals */
 	OCFS2_MOUNT_ERRORS_PANIC = 1 << 3, /* Panic on errors */
@@ -263,9 +263,10 @@
 						   control lists */
 	OCFS2_MOUNT_USRQUOTA = 1 << 10, /* We support user quotas */
 	OCFS2_MOUNT_GRPQUOTA = 1 << 11, /* We support group quotas */
-
-	OCFS2_MOUNT_COHERENCY_BUFFERED = 1 << 12 /* Allow concurrent O_DIRECT
-						    writes */
+	OCFS2_MOUNT_COHERENCY_BUFFERED = 1 << 12, /* Allow concurrent O_DIRECT
+						     writes */
+	OCFS2_MOUNT_HB_NONE = 1 << 13, /* No heartbeat */
+	OCFS2_MOUNT_HB_GLOBAL = 1 << 14, /* Global heartbeat */
 };
 
 #define OCFS2_OSB_SOFT_RO			0x0001
@@ -379,6 +380,8 @@
 	struct ocfs2_alloc_stats alloc_stats;
 	char dev_str[20];		/* "major,minor" of the device */
 
+	u8 osb_stackflags;
+
 	char osb_cluster_stack[OCFS2_STACK_LABEL_LEN + 1];
 	struct ocfs2_cluster_connection *cconn;
 	struct ocfs2_lock_res osb_super_lockres;
@@ -612,10 +615,35 @@
 	return ret;
 }
 
-static inline int ocfs2_userspace_stack(struct ocfs2_super *osb)
+static inline int ocfs2_clusterinfo_valid(struct ocfs2_super *osb)
 {
 	return (osb->s_feature_incompat &
-		OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK);
+		(OCFS2_FEATURE_INCOMPAT_USERSPACE_STACK |
+		 OCFS2_FEATURE_INCOMPAT_CLUSTERINFO));
+}
+
+static inline int ocfs2_userspace_stack(struct ocfs2_super *osb)
+{
+	if (ocfs2_clusterinfo_valid(osb) &&
+	    memcmp(osb->osb_cluster_stack, OCFS2_CLASSIC_CLUSTER_STACK,
+		   OCFS2_STACK_LABEL_LEN))
+		return 1;
+	return 0;
+}
+
+static inline int ocfs2_o2cb_stack(struct ocfs2_super *osb)
+{
+	if (ocfs2_clusterinfo_valid(osb) &&
+	    !memcmp(osb->osb_cluster_stack, OCFS2_CLASSIC_CLUSTER_STACK,
+		   OCFS2_STACK_LABEL_LEN))
+		return 1;
+	return 0;
+}
+
+static inline int ocfs2_cluster_o2cb_global_heartbeat(struct ocfs2_super *osb)
+{
+	return ocfs2_o2cb_stack(osb) &&
+		(osb->osb_stackflags & OCFS2_CLUSTER_O2CB_GLOBAL_HEARTBEAT);
 }
 
 static inline int ocfs2_mount_local(struct ocfs2_super *osb)
diff --git a/fs/ocfs2/ocfs2_fs.h b/fs/ocfs2/ocfs2_fs.h
index 723b20d..c2e4f82 100644
--- a/fs/ocfs2/ocfs2_fs.h
+++ b/fs/ocfs2/ocfs2_fs.h
@@ -101,7 +101,8 @@
 					 | OCFS2_FEATURE_INCOMPAT_META_ECC \
 					 | OCFS2_FEATURE_INCOMPAT_INDEXED_DIRS \
 					 | OCFS2_FEATURE_INCOMPAT_REFCOUNT_TREE \
-					 | OCFS2_FEATURE_INCOMPAT_DISCONTIG_BG)
+					 | OCFS2_FEATURE_INCOMPAT_DISCONTIG_BG	\
+					 | OCFS2_FEATURE_INCOMPAT_CLUSTERINFO)
 #define OCFS2_FEATURE_RO_COMPAT_SUPP	(OCFS2_FEATURE_RO_COMPAT_UNWRITTEN \
 					 | OCFS2_FEATURE_RO_COMPAT_USRQUOTA \
 					 | OCFS2_FEATURE_RO_COMPAT_GRPQUOTA)
@@ -170,6 +171,13 @@
 #define OCFS2_FEATURE_INCOMPAT_DISCONTIG_BG	0x2000
 
 /*
+ * Incompat bit to indicate useable clusterinfo with stackflags for all
+ * cluster stacks (userspace adnd o2cb). If this bit is set,
+ * INCOMPAT_USERSPACE_STACK becomes superfluous and thus should not be set.
+ */
+#define OCFS2_FEATURE_INCOMPAT_CLUSTERINFO	0x4000
+
+/*
  * backup superblock flag is used to indicate that this volume
  * has backup superblocks.
  */
@@ -235,18 +243,31 @@
 #define OCFS2_HAS_REFCOUNT_FL   (0x0010)
 
 /* Inode attributes, keep in sync with EXT2 */
-#define OCFS2_SECRM_FL		(0x00000001)	/* Secure deletion */
-#define OCFS2_UNRM_FL		(0x00000002)	/* Undelete */
-#define OCFS2_COMPR_FL		(0x00000004)	/* Compress file */
-#define OCFS2_SYNC_FL		(0x00000008)	/* Synchronous updates */
-#define OCFS2_IMMUTABLE_FL	(0x00000010)	/* Immutable file */
-#define OCFS2_APPEND_FL		(0x00000020)	/* writes to file may only append */
-#define OCFS2_NODUMP_FL		(0x00000040)	/* do not dump file */
-#define OCFS2_NOATIME_FL	(0x00000080)	/* do not update atime */
-#define OCFS2_DIRSYNC_FL	(0x00010000)	/* dirsync behaviour (directories only) */
+#define OCFS2_SECRM_FL			FS_SECRM_FL	/* Secure deletion */
+#define OCFS2_UNRM_FL			FS_UNRM_FL	/* Undelete */
+#define OCFS2_COMPR_FL			FS_COMPR_FL	/* Compress file */
+#define OCFS2_SYNC_FL			FS_SYNC_FL	/* Synchronous updates */
+#define OCFS2_IMMUTABLE_FL		FS_IMMUTABLE_FL	/* Immutable file */
+#define OCFS2_APPEND_FL			FS_APPEND_FL	/* writes to file may only append */
+#define OCFS2_NODUMP_FL			FS_NODUMP_FL	/* do not dump file */
+#define OCFS2_NOATIME_FL		FS_NOATIME_FL	/* do not update atime */
+/* Reserved for compression usage... */
+#define OCFS2_DIRTY_FL			FS_DIRTY_FL
+#define OCFS2_COMPRBLK_FL		FS_COMPRBLK_FL	/* One or more compressed clusters */
+#define OCFS2_NOCOMP_FL			FS_NOCOMP_FL	/* Don't compress */
+#define OCFS2_ECOMPR_FL			FS_ECOMPR_FL	/* Compression error */
+/* End compression flags --- maybe not all used */
+#define OCFS2_BTREE_FL			FS_BTREE_FL	/* btree format dir */
+#define OCFS2_INDEX_FL			FS_INDEX_FL	/* hash-indexed directory */
+#define OCFS2_IMAGIC_FL			FS_IMAGIC_FL	/* AFS directory */
+#define OCFS2_JOURNAL_DATA_FL		FS_JOURNAL_DATA_FL /* Reserved for ext3 */
+#define OCFS2_NOTAIL_FL			FS_NOTAIL_FL	/* file tail should not be merged */
+#define OCFS2_DIRSYNC_FL		FS_DIRSYNC_FL	/* dirsync behaviour (directories only) */
+#define OCFS2_TOPDIR_FL			FS_TOPDIR_FL	/* Top of directory hierarchies*/
+#define OCFS2_RESERVED_FL		FS_RESERVED_FL	/* reserved for ext2 lib */
 
-#define OCFS2_FL_VISIBLE	(0x000100FF)	/* User visible flags */
-#define OCFS2_FL_MODIFIABLE	(0x000100FF)	/* User modifiable flags */
+#define OCFS2_FL_VISIBLE		FS_FL_USER_VISIBLE	/* User visible flags */
+#define OCFS2_FL_MODIFIABLE		FS_FL_USER_MODIFIABLE	/* User modifiable flags */
 
 /*
  * Extent record flags (e_node.leaf.flags)
@@ -279,10 +300,13 @@
 #define OCFS2_VOL_UUID_LEN		16
 #define OCFS2_MAX_VOL_LABEL_LEN		64
 
-/* The alternate, userspace stack fields */
+/* The cluster stack fields */
 #define OCFS2_STACK_LABEL_LEN		4
 #define OCFS2_CLUSTER_NAME_LEN		16
 
+/* Classic (historically speaking) cluster stack */
+#define OCFS2_CLASSIC_CLUSTER_STACK	"o2cb"
+
 /* Journal limits (in bytes) */
 #define OCFS2_MIN_JOURNAL_SIZE		(4 * 1024 * 1024)
 
@@ -292,6 +316,11 @@
  */
 #define OCFS2_MIN_XATTR_INLINE_SIZE     256
 
+/*
+ * Cluster info flags (ocfs2_cluster_info.ci_stackflags)
+ */
+#define OCFS2_CLUSTER_O2CB_GLOBAL_HEARTBEAT	(0x01)
+
 struct ocfs2_system_inode_info {
 	char	*si_name;
 	int	si_iflags;
@@ -352,6 +381,7 @@
 /* Parameter passed from mount.ocfs2 to module */
 #define OCFS2_HB_NONE			"heartbeat=none"
 #define OCFS2_HB_LOCAL			"heartbeat=local"
+#define OCFS2_HB_GLOBAL			"heartbeat=global"
 
 /*
  * OCFS2 directory file types.  Only the low 3 bits are used.  The
@@ -558,9 +588,21 @@
  */
 };
 
+/*
+ * ci_stackflags is only valid if the incompat bit
+ * OCFS2_FEATURE_INCOMPAT_CLUSTERINFO is set.
+ */
 struct ocfs2_cluster_info {
 /*00*/	__u8   ci_stack[OCFS2_STACK_LABEL_LEN];
-	__le32 ci_reserved;
+	union {
+		__le32 ci_reserved;
+		struct {
+			__u8 ci_stackflags;
+			__u8 ci_reserved1;
+			__u8 ci_reserved2;
+			__u8 ci_reserved3;
+		};
+	};
 /*08*/	__u8   ci_cluster[OCFS2_CLUSTER_NAME_LEN];
 /*18*/
 };
@@ -597,9 +639,9 @@
 					 * group header */
 /*50*/	__u8  s_label[OCFS2_MAX_VOL_LABEL_LEN];	/* Label for mounting, etc. */
 /*90*/	__u8  s_uuid[OCFS2_VOL_UUID_LEN];	/* 128-bit uuid */
-/*A0*/  struct ocfs2_cluster_info s_cluster_info; /* Selected userspace
-						     stack.  Only valid
-						     with INCOMPAT flag. */
+/*A0*/  struct ocfs2_cluster_info s_cluster_info; /* Only valid if either
+						     userspace or clusterinfo
+						     INCOMPAT flag set. */
 /*B8*/	__le16 s_xattr_inline_size;	/* extended attribute inline size
 					   for this fs*/
 	__le16 s_reserved0;
diff --git a/fs/ocfs2/ocfs2_ioctl.h b/fs/ocfs2/ocfs2_ioctl.h
index 9bc5354..b46f39b 100644
--- a/fs/ocfs2/ocfs2_ioctl.h
+++ b/fs/ocfs2/ocfs2_ioctl.h
@@ -23,10 +23,10 @@
 /*
  * ioctl commands
  */
-#define OCFS2_IOC_GETFLAGS	_IOR('f', 1, long)
-#define OCFS2_IOC_SETFLAGS	_IOW('f', 2, long)
-#define OCFS2_IOC32_GETFLAGS	_IOR('f', 1, int)
-#define OCFS2_IOC32_SETFLAGS	_IOW('f', 2, int)
+#define OCFS2_IOC_GETFLAGS	FS_IOC_GETFLAGS
+#define OCFS2_IOC_SETFLAGS	FS_IOC_SETFLAGS
+#define OCFS2_IOC32_GETFLAGS	FS_IOC32_GETFLAGS
+#define OCFS2_IOC32_SETFLAGS	FS_IOC32_SETFLAGS
 
 /*
  * Space reservation / allocation / free ioctls and argument structure
diff --git a/fs/ocfs2/refcounttree.c b/fs/ocfs2/refcounttree.c
index a120cfc..b5f9160 100644
--- a/fs/ocfs2/refcounttree.c
+++ b/fs/ocfs2/refcounttree.c
@@ -4240,8 +4240,9 @@
 		goto out;
 	}
 
-	mutex_lock(&new_inode->i_mutex);
-	ret = ocfs2_inode_lock(new_inode, &new_bh, 1);
+	mutex_lock_nested(&new_inode->i_mutex, I_MUTEX_CHILD);
+	ret = ocfs2_inode_lock_nested(new_inode, &new_bh, 1,
+				      OI_LS_REFLINK_TARGET);
 	if (ret) {
 		mlog_errno(ret);
 		goto out_unlock;
diff --git a/fs/ocfs2/reservations.c b/fs/ocfs2/reservations.c
index d8b6e42..3e78db3 100644
--- a/fs/ocfs2/reservations.c
+++ b/fs/ocfs2/reservations.c
@@ -732,25 +732,23 @@
 			   struct ocfs2_alloc_reservation *resv,
 			   int *cstart, int *clen)
 {
-	unsigned int wanted = *clen;
-
 	if (resv == NULL || ocfs2_resmap_disabled(resmap))
 		return -ENOSPC;
 
 	spin_lock(&resv_lock);
 
-	/*
-	 * We don't want to over-allocate for temporary
-	 * windows. Otherwise, we run the risk of fragmenting the
-	 * allocation space.
-	 */
-	wanted = ocfs2_resv_window_bits(resmap, resv);
-	if ((resv->r_flags & OCFS2_RESV_FLAG_TMP) || wanted < *clen)
-		wanted = *clen;
-
 	if (ocfs2_resv_empty(resv)) {
-		mlog(0, "empty reservation, find new window\n");
+		/*
+		 * We don't want to over-allocate for temporary
+		 * windows. Otherwise, we run the risk of fragmenting the
+		 * allocation space.
+		 */
+		unsigned int wanted = ocfs2_resv_window_bits(resmap, resv);
 
+		if ((resv->r_flags & OCFS2_RESV_FLAG_TMP) || wanted < *clen)
+			wanted = *clen;
+
+		mlog(0, "empty reservation, find new window\n");
 		/*
 		 * Try to get a window here. If it works, we must fall
 		 * through and test the bitmap . This avoids some
diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c
index 0d3049f..19965b0 100644
--- a/fs/ocfs2/stack_o2cb.c
+++ b/fs/ocfs2/stack_o2cb.c
@@ -283,6 +283,8 @@
 	/* for now we only have one cluster/node, make sure we see it
 	 * in the heartbeat universe */
 	if (!o2hb_check_local_node_heartbeating()) {
+		if (o2hb_global_heartbeat_active())
+			mlog(ML_ERROR, "Global heartbeat not started\n");
 		rc = -EINVAL;
 		goto out;
 	}
diff --git a/fs/ocfs2/suballoc.c b/fs/ocfs2/suballoc.c
index 64f2c50..5fed60d 100644
--- a/fs/ocfs2/suballoc.c
+++ b/fs/ocfs2/suballoc.c
@@ -357,7 +357,7 @@
 static void ocfs2_bg_discontig_add_extent(struct ocfs2_super *osb,
 					  struct ocfs2_group_desc *bg,
 					  struct ocfs2_chain_list *cl,
-					  u64 p_blkno, u32 clusters)
+					  u64 p_blkno, unsigned int clusters)
 {
 	struct ocfs2_extent_list *el = &bg->bg_list;
 	struct ocfs2_extent_rec *rec;
@@ -369,7 +369,7 @@
 	rec->e_blkno = cpu_to_le64(p_blkno);
 	rec->e_cpos = cpu_to_le32(le16_to_cpu(bg->bg_bits) /
 				  le16_to_cpu(cl->cl_bpc));
-	rec->e_leaf_clusters = cpu_to_le32(clusters);
+	rec->e_leaf_clusters = cpu_to_le16(clusters);
 	le16_add_cpu(&bg->bg_bits, clusters * le16_to_cpu(cl->cl_bpc));
 	le16_add_cpu(&bg->bg_free_bits_count,
 		     clusters * le16_to_cpu(cl->cl_bpc));
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 9122d59..a8a0ca4 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -162,6 +162,7 @@
 	Opt_nointr,
 	Opt_hb_none,
 	Opt_hb_local,
+	Opt_hb_global,
 	Opt_data_ordered,
 	Opt_data_writeback,
 	Opt_atime_quantum,
@@ -192,6 +193,7 @@
 	{Opt_nointr, "nointr"},
 	{Opt_hb_none, OCFS2_HB_NONE},
 	{Opt_hb_local, OCFS2_HB_LOCAL},
+	{Opt_hb_global, OCFS2_HB_GLOBAL},
 	{Opt_data_ordered, "data=ordered"},
 	{Opt_data_writeback, "data=writeback"},
 	{Opt_atime_quantum, "atime_quantum=%u"},
@@ -626,6 +628,7 @@
 	int ret = 0;
 	struct mount_options parsed_options;
 	struct ocfs2_super *osb = OCFS2_SB(sb);
+	u32 tmp;
 
 	lock_kernel();
 
@@ -635,8 +638,9 @@
 		goto out;
 	}
 
-	if ((osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) !=
-	    (parsed_options.mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
+	tmp = OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL |
+		OCFS2_MOUNT_HB_NONE;
+	if ((osb->s_mount_opt & tmp) != (parsed_options.mount_opt & tmp)) {
 		ret = -EINVAL;
 		mlog(ML_ERROR, "Cannot change heartbeat mode on remount\n");
 		goto out;
@@ -827,23 +831,29 @@
 
 static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
 {
-	if (ocfs2_mount_local(osb)) {
-		if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+	u32 hb_enabled = OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL;
+
+	if (osb->s_mount_opt & hb_enabled) {
+		if (ocfs2_mount_local(osb)) {
 			mlog(ML_ERROR, "Cannot heartbeat on a locally "
 			     "mounted device.\n");
 			return -EINVAL;
 		}
-	}
-
-	if (ocfs2_userspace_stack(osb)) {
-		if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+		if (ocfs2_userspace_stack(osb)) {
 			mlog(ML_ERROR, "Userspace stack expected, but "
 			     "o2cb heartbeat arguments passed to mount\n");
 			return -EINVAL;
 		}
+		if (((osb->s_mount_opt & OCFS2_MOUNT_HB_GLOBAL) &&
+		     !ocfs2_cluster_o2cb_global_heartbeat(osb)) ||
+		    ((osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) &&
+		     ocfs2_cluster_o2cb_global_heartbeat(osb))) {
+			mlog(ML_ERROR, "Mismatching o2cb heartbeat modes\n");
+			return -EINVAL;
+		}
 	}
 
-	if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
+	if (!(osb->s_mount_opt & hb_enabled)) {
 		if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb) &&
 		    !ocfs2_userspace_stack(osb)) {
 			mlog(ML_ERROR, "Heartbeat has to be started to mount "
@@ -1309,6 +1319,7 @@
 {
 	int status;
 	char *p;
+	u32 tmp;
 
 	mlog_entry("remount: %d, options: \"%s\"\n", is_remount,
 		   options ? options : "(none)");
@@ -1340,7 +1351,10 @@
 			mopt->mount_opt |= OCFS2_MOUNT_HB_LOCAL;
 			break;
 		case Opt_hb_none:
-			mopt->mount_opt &= ~OCFS2_MOUNT_HB_LOCAL;
+			mopt->mount_opt |= OCFS2_MOUNT_HB_NONE;
+			break;
+		case Opt_hb_global:
+			mopt->mount_opt |= OCFS2_MOUNT_HB_GLOBAL;
 			break;
 		case Opt_barrier:
 			if (match_int(&args[0], &option)) {
@@ -1501,6 +1515,15 @@
 		}
 	}
 
+	/* Ensure only one heartbeat mode */
+	tmp = mopt->mount_opt & (OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL |
+				 OCFS2_MOUNT_HB_NONE);
+	if (hweight32(tmp) != 1) {
+		mlog(ML_ERROR, "Invalid heartbeat mount options\n");
+		status = 0;
+		goto bail;
+	}
+
 	status = 1;
 
 bail:
@@ -1514,10 +1537,14 @@
 	unsigned long opts = osb->s_mount_opt;
 	unsigned int local_alloc_megs;
 
-	if (opts & OCFS2_MOUNT_HB_LOCAL)
-		seq_printf(s, ",_netdev,heartbeat=local");
-	else
-		seq_printf(s, ",heartbeat=none");
+	if (opts & (OCFS2_MOUNT_HB_LOCAL | OCFS2_MOUNT_HB_GLOBAL)) {
+		seq_printf(s, ",_netdev");
+		if (opts & OCFS2_MOUNT_HB_LOCAL)
+			seq_printf(s, ",%s", OCFS2_HB_LOCAL);
+		else
+			seq_printf(s, ",%s", OCFS2_HB_GLOBAL);
+	} else
+		seq_printf(s, ",%s", OCFS2_HB_NONE);
 
 	if (opts & OCFS2_MOUNT_NOINTR)
 		seq_printf(s, ",nointr");
@@ -2209,7 +2236,9 @@
 		goto bail;
 	}
 
-	if (ocfs2_userspace_stack(osb)) {
+	if (ocfs2_clusterinfo_valid(osb)) {
+		osb->osb_stackflags =
+			OCFS2_RAW_SB(di)->s_cluster_info.ci_stackflags;
 		memcpy(osb->osb_cluster_stack,
 		       OCFS2_RAW_SB(di)->s_cluster_info.ci_stack,
 		       OCFS2_STACK_LABEL_LEN);
diff --git a/fs/ocfs2/symlink.c b/fs/ocfs2/symlink.c
index 32499d21..9975457 100644
--- a/fs/ocfs2/symlink.c
+++ b/fs/ocfs2/symlink.c
@@ -128,7 +128,7 @@
 	}
 
 	/* Fast symlinks can't be large */
-	len = strlen(target);
+	len = strnlen(target, ocfs2_fast_symlink_chars(inode->i_sb));
 	link = kzalloc(len + 1, GFP_NOFS);
 	if (!link) {
 		status = -ENOMEM;
diff --git a/fs/ocfs2/xattr.c b/fs/ocfs2/xattr.c
index d03469f..06fa5e7 100644
--- a/fs/ocfs2/xattr.c
+++ b/fs/ocfs2/xattr.c
@@ -1286,13 +1286,11 @@
 	xis.inode_bh = xbs.inode_bh = di_bh;
 	di = (struct ocfs2_dinode *)di_bh->b_data;
 
-	down_read(&oi->ip_xattr_sem);
 	ret = ocfs2_xattr_ibody_get(inode, name_index, name, buffer,
 				    buffer_size, &xis);
 	if (ret == -ENODATA && di->i_xattr_loc)
 		ret = ocfs2_xattr_block_get(inode, name_index, name, buffer,
 					    buffer_size, &xbs);
-	up_read(&oi->ip_xattr_sem);
 
 	return ret;
 }
@@ -1316,8 +1314,10 @@
 		mlog_errno(ret);
 		return ret;
 	}
+	down_read(&OCFS2_I(inode)->ip_xattr_sem);
 	ret = ocfs2_xattr_get_nolock(inode, di_bh, name_index,
 				     name, buffer, buffer_size);
+	up_read(&OCFS2_I(inode)->ip_xattr_sem);
 
 	ocfs2_inode_unlock(inode, 0);