diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h
index 5351b08c897a22568e586ccbf3da3b28b47aeb8f..5f7b2c4a09ab86ab126ff9569f8b39639a9da1f3 100644
--- a/include/uapi/linux/tipc.h
+++ b/include/uapi/linux/tipc.h
@@ -231,6 +231,20 @@ struct sockaddr_tipc {
 #define TIPC_SOCK_RECVQ_DEPTH	132	/* Default: none (read only) */
 #define TIPC_MCAST_BROADCAST    133     /* Default: TIPC selects. No arg */
 #define TIPC_MCAST_REPLICAST    134     /* Default: TIPC selects. No arg */
+#define TIPC_GROUP_JOIN         135     /* Takes struct tipc_group_req* */
+#define TIPC_GROUP_LEAVE        136     /* No argument */
+
+/*
+ * Flag values
+ */
+#define TIPC_GROUP_LOOPBACK     0x1  /* Receive copy of sent msg when match */
+
+struct tipc_group_req {
+	__u32 type;      /* group id */
+	__u32 instance;  /* member id */
+	__u32 scope;     /* zone/cluster/node */
+	__u32 flags;
+};
 
 /*
  * Maximum sizes of TIPC bearer-related names (including terminating NULL)
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index 31b9f9c5297421ef0430228c1deb3827d4c537d5..a3af73ec0b78aaf52c037e0ad44ffd35c7d4395a 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -8,7 +8,7 @@ tipc-y	+= addr.o bcast.o bearer.o \
 	   core.o link.o discover.o msg.o  \
 	   name_distr.o  subscr.o monitor.o name_table.o net.o  \
 	   netlink.o netlink_compat.o node.o socket.o eth_media.o \
-	   server.o socket.o
+	   server.o socket.o group.o
 
 tipc-$(CONFIG_TIPC_MEDIA_UDP)	+= udp_media.o
 tipc-$(CONFIG_TIPC_MEDIA_IB)	+= ib_media.o
diff --git a/net/tipc/group.c b/net/tipc/group.c
new file mode 100644
index 0000000000000000000000000000000000000000..3f0e1ce1e3b9a55b33bc6989cf95d5d76e116fdc
--- /dev/null
+++ b/net/tipc/group.c
@@ -0,0 +1,404 @@
+/*
+ * net/tipc/group.c: TIPC group messaging code
+ *
+ * Copyright (c) 2017, Ericsson AB
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "core.h"
+#include "addr.h"
+#include "group.h"
+#include "bcast.h"
+#include "server.h"
+#include "msg.h"
+#include "socket.h"
+#include "node.h"
+#include "name_table.h"
+#include "subscr.h"
+
+#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
+#define ADV_IDLE ADV_UNIT
+
+enum mbr_state {
+	MBR_QUARANTINED,
+	MBR_DISCOVERED,
+	MBR_JOINING,
+	MBR_PUBLISHED,
+	MBR_JOINED,
+	MBR_LEAVING
+};
+
+struct tipc_member {
+	struct rb_node tree_node;
+	struct list_head list;
+	u32 node;
+	u32 port;
+	enum mbr_state state;
+	u16 bc_rcv_nxt;
+};
+
+struct tipc_group {
+	struct rb_root members;
+	struct tipc_nlist dests;
+	struct net *net;
+	int subid;
+	u32 type;
+	u32 instance;
+	u32 domain;
+	u32 scope;
+	u32 portid;
+	u16 member_cnt;
+	u16 bc_snd_nxt;
+	bool loopback;
+};
+
+static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
+				  int mtyp, struct sk_buff_head *xmitq);
+
+u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
+{
+	return grp->bc_snd_nxt;
+}
+
+static bool tipc_group_is_receiver(struct tipc_member *m)
+{
+	return m && m->state >= MBR_JOINED;
+}
+
+int tipc_group_size(struct tipc_group *grp)
+{
+	return grp->member_cnt;
+}
+
+struct tipc_group *tipc_group_create(struct net *net, u32 portid,
+				     struct tipc_group_req *mreq)
+{
+	struct tipc_group *grp;
+	u32 type = mreq->type;
+
+	grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
+	if (!grp)
+		return NULL;
+	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
+	grp->members = RB_ROOT;
+	grp->net = net;
+	grp->portid = portid;
+	grp->domain = addr_domain(net, mreq->scope);
+	grp->type = type;
+	grp->instance = mreq->instance;
+	grp->scope = mreq->scope;
+	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
+	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
+		return grp;
+	kfree(grp);
+	return NULL;
+}
+
+void tipc_group_delete(struct net *net, struct tipc_group *grp)
+{
+	struct rb_root *tree = &grp->members;
+	struct tipc_member *m, *tmp;
+	struct sk_buff_head xmitq;
+
+	__skb_queue_head_init(&xmitq);
+
+	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
+		tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
+		list_del(&m->list);
+		kfree(m);
+	}
+	tipc_node_distr_xmit(net, &xmitq);
+	tipc_nlist_purge(&grp->dests);
+	tipc_topsrv_kern_unsubscr(net, grp->subid);
+	kfree(grp);
+}
+
+struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
+					   u32 node, u32 port)
+{
+	struct rb_node *n = grp->members.rb_node;
+	u64 nkey, key = (u64)node << 32 | port;
+	struct tipc_member *m;
+
+	while (n) {
+		m = container_of(n, struct tipc_member, tree_node);
+		nkey = (u64)m->node << 32 | m->port;
+		if (key < nkey)
+			n = n->rb_left;
+		else if (key > nkey)
+			n = n->rb_right;
+		else
+			return m;
+	}
+	return NULL;
+}
+
+static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
+						u32 node)
+{
+	struct tipc_member *m;
+	struct rb_node *n;
+
+	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
+		m = container_of(n, struct tipc_member, tree_node);
+		if (m->node == node)
+			return m;
+	}
+	return NULL;
+}
+
+static void tipc_group_add_to_tree(struct tipc_group *grp,
+				   struct tipc_member *m)
+{
+	u64 nkey, key = (u64)m->node << 32 | m->port;
+	struct rb_node **n, *parent = NULL;
+	struct tipc_member *tmp;
+
+	n = &grp->members.rb_node;
+	while (*n) {
+		tmp = container_of(*n, struct tipc_member, tree_node);
+		parent = *n;
+		tmp = container_of(parent, struct tipc_member, tree_node);
+		nkey = (u64)tmp->node << 32 | tmp->port;
+		if (key < nkey)
+			n = &(*n)->rb_left;
+		else if (key > nkey)
+			n = &(*n)->rb_right;
+		else
+			return;
+	}
+	rb_link_node(&m->tree_node, parent, n);
+	rb_insert_color(&m->tree_node, &grp->members);
+}
+
+static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
+						    u32 node, u32 port,
+						    int state)
+{
+	struct tipc_member *m;
+
+	m = kzalloc(sizeof(*m), GFP_ATOMIC);
+	if (!m)
+		return NULL;
+	INIT_LIST_HEAD(&m->list);
+	m->node = node;
+	m->port = port;
+	grp->member_cnt++;
+	tipc_group_add_to_tree(grp, m);
+	tipc_nlist_add(&grp->dests, m->node);
+	m->state = state;
+	return m;
+}
+
+void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
+{
+	tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
+}
+
+static void tipc_group_delete_member(struct tipc_group *grp,
+				     struct tipc_member *m)
+{
+	rb_erase(&m->tree_node, &grp->members);
+	grp->member_cnt--;
+	list_del_init(&m->list);
+
+	/* If last member on a node, remove node from dest list */
+	if (!tipc_group_find_node(grp, m->node))
+		tipc_nlist_del(&grp->dests, m->node);
+
+	kfree(m);
+}
+
+struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
+{
+	return &grp->dests;
+}
+
+void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
+		     int *scope)
+{
+	seq->type = grp->type;
+	seq->lower = grp->instance;
+	seq->upper = grp->instance;
+	*scope = grp->scope;
+}
+
+void tipc_group_update_bc_members(struct tipc_group *grp)
+{
+	grp->bc_snd_nxt++;
+}
+
+/* tipc_group_filter_msg() - determine if we should accept arriving message
+ */
+void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
+			   struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb = __skb_dequeue(inputq);
+	struct tipc_member *m;
+	struct tipc_msg *hdr;
+	u32 node, port;
+	int mtyp;
+
+	if (!skb)
+		return;
+
+	hdr = buf_msg(skb);
+	mtyp = msg_type(hdr);
+	node =  msg_orignode(hdr);
+	port = msg_origport(hdr);
+
+	if (!msg_in_group(hdr))
+		goto drop;
+
+	m = tipc_group_find_member(grp, node, port);
+	if (!tipc_group_is_receiver(m))
+		goto drop;
+
+	__skb_queue_tail(inputq, skb);
+
+	m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
+	return;
+drop:
+	kfree_skb(skb);
+}
+
+static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
+				  int mtyp, struct sk_buff_head *xmitq)
+{
+	struct tipc_msg *hdr;
+	struct sk_buff *skb;
+
+	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
+			      m->node, tipc_own_addr(grp->net),
+			      m->port, grp->portid, 0);
+	if (!skb)
+		return;
+
+	hdr = buf_msg(skb);
+	if (mtyp == GRP_JOIN_MSG)
+		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
+	__skb_queue_tail(xmitq, skb);
+}
+
+void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
+			  struct sk_buff_head *xmitq)
+{
+	u32 node = msg_orignode(hdr);
+	u32 port = msg_origport(hdr);
+	struct tipc_member *m;
+
+	if (!grp)
+		return;
+
+	m = tipc_group_find_member(grp, node, port);
+
+	switch (msg_type(hdr)) {
+	case GRP_JOIN_MSG:
+		if (!m)
+			m = tipc_group_create_member(grp, node, port,
+						     MBR_QUARANTINED);
+		if (!m)
+			return;
+		m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+
+		/* Wait until PUBLISH event is received */
+		if (m->state == MBR_DISCOVERED)
+			m->state = MBR_JOINING;
+		else if (m->state == MBR_PUBLISHED)
+			m->state = MBR_JOINED;
+		return;
+	case GRP_LEAVE_MSG:
+		if (!m)
+			return;
+
+		/* Wait until WITHDRAW event is received */
+		if (m->state != MBR_LEAVING) {
+			m->state = MBR_LEAVING;
+			return;
+		}
+		/* Otherwise deliver already received WITHDRAW event */
+		tipc_group_delete_member(grp, m);
+		return;
+	default:
+		pr_warn("Received unknown GROUP_PROTO message\n");
+	}
+}
+
+/* tipc_group_member_evt() - receive and handle a member up/down event
+ */
+void tipc_group_member_evt(struct tipc_group *grp,
+			   struct sk_buff *skb,
+			   struct sk_buff_head *xmitq)
+{
+	struct tipc_msg *hdr = buf_msg(skb);
+	struct tipc_event *evt = (void *)msg_data(hdr);
+	u32 node = evt->port.node;
+	u32 port = evt->port.ref;
+	struct tipc_member *m;
+	struct net *net;
+	u32 self;
+
+	if (!grp)
+		goto drop;
+
+	net = grp->net;
+	self = tipc_own_addr(net);
+	if (!grp->loopback && node == self && port == grp->portid)
+		goto drop;
+
+	m = tipc_group_find_member(grp, node, port);
+
+	if (evt->event == TIPC_PUBLISHED) {
+		if (!m)
+			m = tipc_group_create_member(grp, node, port,
+						     MBR_DISCOVERED);
+		if (!m)
+			goto drop;
+
+		/* Wait if JOIN message not yet received */
+		if (m->state == MBR_DISCOVERED)
+			m->state = MBR_PUBLISHED;
+		else
+			m->state = MBR_JOINED;
+		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+	} else if (evt->event == TIPC_WITHDRAWN) {
+		if (!m)
+			goto drop;
+
+		/* Keep back event if more messages might be expected */
+		if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
+			m->state = MBR_LEAVING;
+		else
+			tipc_group_delete_member(grp, m);
+	}
+drop:
+	kfree_skb(skb);
+}
diff --git a/net/tipc/group.h b/net/tipc/group.h
new file mode 100644
index 0000000000000000000000000000000000000000..9bdf4479fc0355618a8f9a0b189928d81ec08326
--- /dev/null
+++ b/net/tipc/group.h
@@ -0,0 +1,64 @@
+/*
+ * net/tipc/group.h: Include file for TIPC group unicast/multicast functions
+ *
+ * Copyright (c) 2017, Ericsson AB
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TIPC_GROUP_H
+#define _TIPC_GROUP_H
+
+#include "core.h"
+
+struct tipc_group;
+struct tipc_member;
+struct tipc_msg;
+
+struct tipc_group *tipc_group_create(struct net *net, u32 portid,
+				     struct tipc_group_req *mreq);
+void tipc_group_delete(struct net *net, struct tipc_group *grp);
+void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
+struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
+void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
+		     int *scope);
+void tipc_group_filter_msg(struct tipc_group *grp,
+			   struct sk_buff_head *inputq,
+			   struct sk_buff_head *xmitq);
+void tipc_group_member_evt(struct tipc_group *grp,
+			   struct sk_buff *skb,
+			   struct sk_buff_head *xmitq);
+void tipc_group_proto_rcv(struct tipc_group *grp,
+			  struct tipc_msg *hdr,
+			  struct sk_buff_head *xmitq);
+void tipc_group_update_bc_members(struct tipc_group *grp);
+u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
+int tipc_group_size(struct tipc_group *grp);
+#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac0144f532aa66ca147ef22f288bad0980861680..bd25bff63925d82b8dec024358f0763b0796f0b4 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
 	case TIPC_MEDIUM_IMPORTANCE:
 	case TIPC_HIGH_IMPORTANCE:
 	case TIPC_CRITICAL_IMPORTANCE:
-		if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
+		if (unlikely(msg_mcast(hdr))) {
 			skb_queue_tail(l->bc_rcvlink->inputq, skb);
 			return true;
 		}
 	case CONN_MANAGER:
+	case GROUP_PROTOCOL:
 		skb_queue_tail(inputq, skb);
 		return true;
 	case NAME_DISTRIBUTOR:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index be3e38aa9dd251db6117c341d13642a2031ac5b7..dad400935405b3d6cd711e0f8d3176c9a3e0cabe 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/msg.h: Include file for TIPC message header routines
  *
- * Copyright (c) 2000-2007, 2014-2015 Ericsson AB
+ * Copyright (c) 2000-2007, 2014-2017 Ericsson AB
  * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -61,10 +61,11 @@ struct plist;
 /*
  * Payload message types
  */
-#define TIPC_CONN_MSG		0
-#define TIPC_MCAST_MSG		1
-#define TIPC_NAMED_MSG		2
-#define TIPC_DIRECT_MSG		3
+#define TIPC_CONN_MSG           0
+#define TIPC_MCAST_MSG          1
+#define TIPC_NAMED_MSG          2
+#define TIPC_DIRECT_MSG         3
+#define TIPC_GRP_BCAST_MSG      4
 
 /*
  * Internal message users
@@ -73,6 +74,7 @@ struct plist;
 #define  MSG_BUNDLER          6
 #define  LINK_PROTOCOL        7
 #define  CONN_MANAGER         8
+#define  GROUP_PROTOCOL       9
 #define  TUNNEL_PROTOCOL      10
 #define  NAME_DISTRIBUTOR     11
 #define  MSG_FRAGMENTER       12
@@ -87,6 +89,7 @@ struct plist;
 #define BASIC_H_SIZE              32	/* Basic payload message */
 #define NAMED_H_SIZE              40	/* Named payload message */
 #define MCAST_H_SIZE              44	/* Multicast payload message */
+#define GROUP_H_SIZE              44	/* Group payload message */
 #define INT_H_SIZE                40	/* Internal messages */
 #define MIN_H_SIZE                24	/* Smallest legal TIPC header size */
 #define MAX_H_SIZE                60	/* Largest possible TIPC header size */
@@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 1, 29, 0x7, n);
 }
 
+static inline int msg_in_group(struct tipc_msg *m)
+{
+	return (msg_type(m) == TIPC_GRP_BCAST_MSG);
+}
+
 static inline u32 msg_named(struct tipc_msg *m)
 {
 	return msg_type(m) == TIPC_NAMED_MSG;
@@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m)
 
 static inline u32 msg_mcast(struct tipc_msg *m)
 {
-	return msg_type(m) == TIPC_MCAST_MSG;
+	int mtyp = msg_type(m);
+
+	return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG));
 }
 
 static inline u32 msg_connected(struct tipc_msg *m)
@@ -514,6 +524,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
 #define DSC_REQ_MSG		0
 #define DSC_RESP_MSG		1
 
+/*
+ * Group protocol message types
+ */
+#define GRP_JOIN_MSG         0
+#define GRP_LEAVE_MSG        1
+
 /*
  * Word 1
  */
@@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
+static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m)
+{
+	return msg_bits(m, 9, 16, 0xffff);
+}
+
+static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
+{
+	msg_set_bits(m, 9, 16, 0xffff, n);
+}
+
+/* Word 10
+ */
+static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
+{
+	return msg_bits(m, 10, 16, 0xffff);
+}
+
+static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n)
+{
+	msg_set_bits(m, 10, 16, 0xffff, n);
+}
+
 static inline bool msg_peer_link_is_up(struct tipc_msg *m)
 {
 	if (likely(msg_user(m) != LINK_PROTOCOL))
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 76bd2777baafafc98fdb9fdbb3b7a6ac7b462606..114d72bab82729c59ed28bd5207165f368533087 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -43,6 +43,7 @@
 #include "bcast.h"
 #include "addr.h"
 #include "node.h"
+#include "group.h"
 #include <net/genetlink.h>
 
 #define TIPC_NAMETBL_SIZE 1024		/* must be a power of 2 */
@@ -596,18 +597,6 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
 	return ref;
 }
 
-/**
- * tipc_nametbl_mc_translate - find multicast destinations
- *
- * Creates list of all local ports that overlap the given multicast address;
- * also determines if any off-node ports overlap.
- *
- * Note: Publications with a scope narrower than 'limit' are ignored.
- * (i.e. local node-scope publications mustn't receive messages arriving
- * from another node, even if the multcast link brought it here)
- *
- * Returns non-zero if any off-node ports overlap
- */
 int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 			      u32 limit, struct list_head *dports)
 {
@@ -679,6 +668,37 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 	rcu_read_unlock();
 }
 
+/* tipc_nametbl_build_group - build list of communication group members
+ */
+void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
+			      u32 type, u32 domain)
+{
+	struct sub_seq *sseq, *stop;
+	struct name_info *info;
+	struct publication *p;
+	struct name_seq *seq;
+
+	rcu_read_lock();
+	seq = nametbl_find_seq(net, type);
+	if (!seq)
+		goto exit;
+
+	spin_lock_bh(&seq->lock);
+	sseq = seq->sseqs;
+	stop = seq->sseqs + seq->first_free;
+	for (; sseq != stop; sseq++) {
+		info = sseq->info;
+		list_for_each_entry(p, &info->zone_list, zone_list) {
+			if (!tipc_in_scope(domain, p->node))
+				continue;
+			tipc_group_add_member(grp, p->node, p->ref);
+		}
+	}
+	spin_unlock_bh(&seq->lock);
+exit:
+	rcu_read_unlock();
+}
+
 /*
  * tipc_nametbl_publish - add name publication to network name tables
  */
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index d121175a92b5125b63454498f2e8a35ef46830e0..97646b17a4a2532a8f7f51dc3f9295a3ac829cbe 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -40,6 +40,7 @@
 struct tipc_subscription;
 struct tipc_plist;
 struct tipc_nlist;
+struct tipc_group;
 
 /*
  * TIPC name types reserved for internal TIPC use (both current and planned)
@@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
 u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
 int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 			      u32 limit, struct list_head *dports);
+void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
+			      u32 type, u32 domain);
 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 				   u32 upper, u32 domain,
 				   struct tipc_nlist *nodes);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index df2f2197c4ad1117410bebb1e300f71818b1e2bd..acd58d23a70e36dc340ea2fee94e9ac65a5361cb 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -48,7 +48,8 @@ enum {
 	TIPC_BCAST_SYNCH      = (1 << 1),
 	TIPC_BCAST_STATE_NACK = (1 << 2),
 	TIPC_BLOCK_FLOWCTL    = (1 << 3),
-	TIPC_BCAST_RCAST      = (1 << 4)
+	TIPC_BCAST_RCAST      = (1 << 4),
+	TIPC_MCAST_GROUPS     = (1 << 5)
 };
 
 #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index daf7c4df4531dcda30ba400dce12189238aa024e..64bbf9d036290f890c6b816b5c6e1576ad0df3b2 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/socket.c: TIPC socket API
  *
- * Copyright (c) 2001-2007, 2012-2016, Ericsson AB
+ * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -45,6 +45,7 @@
 #include "socket.h"
 #include "bcast.h"
 #include "netlink.h"
+#include "group.h"
 
 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
 #define CONN_PROBING_INTERVAL	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
@@ -78,7 +79,7 @@ enum {
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  * @cong_link_cnt: number of congested links
- * @sent_unacked: # messages sent by socket, and not yet acked by peer
+ * @snt_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
  * @peer: 'connected' peer for dgram/rdm
  * @node: hash table node
@@ -109,6 +110,7 @@ struct tipc_sock {
 	struct rhash_head node;
 	struct tipc_mc_method mc_method;
 	struct rcu_head rcu;
+	struct tipc_group *group;
 };
 
 static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 			   struct tipc_name_seq const *seq);
 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 			    struct tipc_name_seq const *seq);
+static int tipc_sk_leave(struct tipc_sock *tsk);
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
@@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock)
 
 	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 	sk->sk_shutdown = SHUTDOWN_MASK;
+	tipc_sk_leave(tsk);
 	tipc_sk_withdraw(tsk, 0, NULL);
 	sk_stop_timer(sk, &sk->sk_timer);
 	tipc_sk_remove(tsk);
@@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
 		res = tipc_sk_withdraw(tsk, 0, NULL);
 		goto exit;
 	}
-
+	if (tsk->group) {
+		res = -EACCES;
+		goto exit;
+	}
 	if (uaddr_len < sizeof(struct sockaddr_tipc)) {
 		res = -EINVAL;
 		goto exit;
@@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
 	u32 mask = 0;
 
 	sock_poll_wait(file, sk_sleep(sk), wait);
@@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 			mask |= (POLLIN | POLLRDNORM);
 		break;
 	case TIPC_OPEN:
-		if (!tsk->cong_link_cnt)
-			mask |= POLLOUT;
+		if (!grp || tipc_group_size(grp))
+			if (!tsk->cong_link_cnt)
+				mask |= POLLOUT;
 		if (tipc_sk_type_connectionless(sk) &&
 		    (!skb_queue_empty(&sk->sk_receive_queue)))
 			mask |= (POLLIN | POLLRDNORM);
@@ -757,6 +766,9 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 	struct tipc_nlist dsts;
 	int rc;
 
+	if (tsk->group)
+		return -EACCES;
+
 	/* Block or return if any destination link is congested */
 	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
 	if (unlikely(rc))
@@ -793,6 +805,64 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 	return rc ? rc : dlen;
 }
 
+/**
+ * tipc_send_group_bcast - send message to all members in communication group
+ * @sk: socket structure
+ * @m: message to send
+ * @dlen: total length of message data
+ * @timeout: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
+				 int dlen, long timeout)
+{
+	struct sock *sk = sock->sk;
+	struct net *net = sock_net(sk);
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_nlist *dsts = tipc_group_dests(grp);
+	struct tipc_mc_method *method = &tsk->mc_method;
+	struct tipc_msg *hdr = &tsk->phdr;
+	int mtu = tipc_bcast_get_mtu(net);
+	struct sk_buff_head pkts;
+	int rc = -EHOSTUNREACH;
+
+	if (!dsts->local && !dsts->remote)
+		return -EHOSTUNREACH;
+
+	/* Block or return if any destination link is congested */
+	rc = tipc_wait_for_cond(sock, &timeout,	!tsk->cong_link_cnt);
+	if (unlikely(rc))
+		return rc;
+
+	/* Complete message header */
+	msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
+	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+	msg_set_destport(hdr, 0);
+	msg_set_destnode(hdr, 0);
+	msg_set_nameinst(hdr, 0);
+	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
+
+	/* Build message as chain of buffers */
+	skb_queue_head_init(&pkts);
+	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+	if (unlikely(rc != dlen))
+		return rc;
+
+	/* Send message */
+	rc = tipc_mcast_xmit(net, &pkts, method, dsts,
+			     &tsk->cong_link_cnt);
+	if (unlikely(rc))
+		return rc;
+
+	/* Update broadcast sequence number */
+	tipc_group_update_bc_members(tsk->group);
+
+	return dlen;
+}
+
 /**
  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
  * @arrvq: queue with arriving messages, to be cloned after destination lookup
@@ -803,13 +873,15 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 		       struct sk_buff_head *inputq)
 {
-	struct tipc_msg *msg;
-	struct list_head dports;
-	u32 portid;
 	u32 scope = TIPC_CLUSTER_SCOPE;
-	struct sk_buff_head tmpq;
-	uint hsz;
+	u32 self = tipc_own_addr(net);
 	struct sk_buff *skb, *_skb;
+	u32 lower = 0, upper = ~0;
+	struct sk_buff_head tmpq;
+	u32 portid, oport, onode;
+	struct list_head dports;
+	struct tipc_msg *msg;
+	int hsz;
 
 	__skb_queue_head_init(&tmpq);
 	INIT_LIST_HEAD(&dports);
@@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
 		msg = buf_msg(skb);
 		hsz = skb_headroom(skb) + msg_hdr_sz(msg);
-
-		if (in_own_node(net, msg_orignode(msg)))
+		oport = msg_origport(msg);
+		onode = msg_orignode(msg);
+		if (onode == self)
 			scope = TIPC_NODE_SCOPE;
 
 		/* Create destination port list and message clones: */
-		tipc_nametbl_mc_translate(net,
-					  msg_nametype(msg), msg_namelower(msg),
-					  msg_nameupper(msg), scope, &dports);
+		if (!msg_in_group(msg)) {
+			lower = msg_namelower(msg);
+			upper = msg_nameupper(msg);
+		}
+		tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
+					  scope, &dports);
 		while (tipc_dest_pop(&dports, NULL, &portid)) {
 			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
 			if (_skb) {
@@ -895,10 +971,6 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
 	kfree_skb(skb);
 }
 
-static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
-{
-}
-
 /**
  * tipc_sendmsg - send message in connectionless manner
  * @sock: socket structure
@@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 	struct list_head *clinks = &tsk->cong_links;
 	bool syn = !tipc_sk_type_connectionless(sk);
+	struct tipc_group *grp = tsk->group;
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct tipc_name_seq *seq;
 	struct sk_buff_head pkts;
@@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
 		return -EMSGSIZE;
 
+	if (unlikely(grp))
+		return tipc_send_group_bcast(sock, m, dlen, timeout);
+
 	if (unlikely(!dest)) {
 		dest = &tsk->peer;
 		if (!syn || dest->family != AF_TIPC)
@@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 	struct sk_buff *skb = __skb_dequeue(inputq);
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = buf_msg(skb);
+	struct tipc_group *grp = tsk->group;
 
 	switch (msg_user(hdr)) {
 	case CONN_MANAGER:
@@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk,
 		tsk->cong_link_cnt--;
 		sk->sk_write_space(sk);
 		break;
+	case GROUP_PROTOCOL:
+		tipc_group_proto_rcv(grp, hdr, xmitq);
+		break;
 	case TOP_SRV:
-		tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
+		tipc_group_member_evt(tsk->group, skb, xmitq);
+		skb = NULL;
 		break;
 	default:
 		break;
@@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 {
 	bool sk_conn = !tipc_sk_type_connectionless(sk);
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group *grp = tsk->group;
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct net *net = sock_net(sk);
 	struct sk_buff_head inputq;
@@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
 
 	if (unlikely(!msg_isdata(hdr)))
 		tipc_sk_proto_rcv(sk, &inputq, xmitq);
-	else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG))
+	else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG))
 		return kfree_skb(skb);
 
+	if (unlikely(grp))
+		tipc_group_filter_msg(grp, &inputq, xmitq);
+
 	/* Validate and add to receive buffer if there is space */
 	while ((skb = __skb_dequeue(&inputq))) {
 		hdr = buf_msg(skb);
 		limit = rcvbuf_limit(sk, skb);
 		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
-		    (!sk_conn && msg_connected(hdr)))
+		    (!sk_conn && msg_connected(hdr)) ||
+		    (!grp && msg_in_group(hdr)))
 			err = TIPC_ERR_NO_PORT;
 		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
 			err = TIPC_ERR_OVERLOAD;
@@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 			sock_put(sk);
 			continue;
 		}
-
 		/* No destination socket => dequeue skb if still there */
 		skb = tipc_skb_dequeue(inputq, dport);
 		if (!skb)
@@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
 
 	lock_sock(sk);
 
+	if (tsk->group) {
+		res = -EINVAL;
+		goto exit;
+	}
+
 	if (dst->family == AF_UNSPEC) {
 		memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
 		if (!tipc_sk_type_connectionless(sk))
@@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net)
 	rhashtable_destroy(&tn->sk_rht);
 }
 
+static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
+{
+	struct net *net = sock_net(&tsk->sk);
+	u32 domain = addr_domain(net, mreq->scope);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_msg *hdr = &tsk->phdr;
+	struct tipc_name_seq seq;
+	int rc;
+
+	if (mreq->type < TIPC_RESERVED_TYPES)
+		return -EACCES;
+	if (grp)
+		return -EACCES;
+	grp = tipc_group_create(net, tsk->portid, mreq);
+	if (!grp)
+		return -ENOMEM;
+	tsk->group = grp;
+	msg_set_lookup_scope(hdr, mreq->scope);
+	msg_set_nametype(hdr, mreq->type);
+	msg_set_dest_droppable(hdr, true);
+	seq.type = mreq->type;
+	seq.lower = mreq->instance;
+	seq.upper = seq.lower;
+	tipc_nametbl_build_group(net, grp, mreq->type, domain);
+	rc = tipc_sk_publish(tsk, mreq->scope, &seq);
+	if (rc)
+		tipc_group_delete(net, grp);
+	return rc;
+}
+
+static int tipc_sk_leave(struct tipc_sock *tsk)
+{
+	struct net *net = sock_net(&tsk->sk);
+	struct tipc_group *grp = tsk->group;
+	struct tipc_name_seq seq;
+	int scope;
+
+	if (!grp)
+		return -EINVAL;
+	tipc_group_self(grp, &seq, &scope);
+	tipc_group_delete(net, grp);
+	tsk->group = NULL;
+	tipc_sk_withdraw(tsk, scope, &seq);
+	return 0;
+}
+
 /**
  * tipc_setsockopt - set socket option
  * @sock: socket structure
@@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_group_req mreq;
 	u32 value = 0;
 	int res = 0;
 
@@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 	case TIPC_CONN_TIMEOUT:
 		if (ol < sizeof(value))
 			return -EINVAL;
-		res = get_user(value, (u32 __user *)ov);
-		if (res)
-			return res;
+		if (get_user(value, (u32 __user *)ov))
+			return -EFAULT;
+		break;
+	case TIPC_GROUP_JOIN:
+		if (ol < sizeof(mreq))
+			return -EINVAL;
+		if (copy_from_user(&mreq, ov, sizeof(mreq)))
+			return -EFAULT;
 		break;
 	default:
 		if (ov || ol)
@@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 		tsk->mc_method.rcast = true;
 		tsk->mc_method.mandatory = true;
 		break;
+	case TIPC_GROUP_JOIN:
+		res = tipc_sk_join(tsk, &mreq);
+		break;
+	case TIPC_GROUP_LEAVE:
+		res = tipc_sk_leave(tsk);
+		break;
 	default:
 		res = -EINVAL;
 	}
@@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	int len;
+	struct tipc_name_seq seq;
+	int len, scope;
 	u32 value;
 	int res;
 
@@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
 	case TIPC_SOCK_RECVQ_DEPTH:
 		value = skb_queue_len(&sk->sk_receive_queue);
 		break;
+	case TIPC_GROUP_JOIN:
+		seq.type = 0;
+		if (tsk->group)
+			tipc_group_self(tsk->group, &seq, &scope);
+		value = seq.type;
+		break;
 	default:
 		res = -EINVAL;
 	}