diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 55092feeaf36e3715b8086a8087cda2776fbe341..5560d60c1ff596ed21bd5db4daf9579daf57487e 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -319,6 +319,11 @@ bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc)
 		if (!xskq_cons_peek_desc(xs->tx, desc, umem))
 			continue;
 
+		/* This is the backpreassure mechanism for the Tx path.
+		 * Reserve space in the completion queue and only proceed
+		 * if there is space in it. This avoids having to implement
+		 * any buffering in the Tx path.
+		 */
 		if (xskq_prod_reserve_addr(umem->cq, desc->addr))
 			goto out;
 
@@ -389,6 +394,11 @@ static int xsk_generic_xmit(struct sock *sk)
 		addr = desc.addr;
 		buffer = xdp_umem_get_data(xs->umem, addr);
 		err = skb_store_bits(skb, 0, buffer, len);
+		/* This is the backpreassure mechanism for the Tx path.
+		 * Reserve space in the completion queue and only proceed
+		 * if there is space in it. This avoids having to implement
+		 * any buffering in the Tx path.
+		 */
 		if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) {
 			kfree_skb(skb);
 			goto out;
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 4c049cacc69329a3cabff539c92d6ae4e9c86951..bec2af11853addb48c658b7e046383ef8025169f 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -81,60 +81,27 @@ struct xsk_queue {
  * now and again after circling through the ring.
  */
 
-/* Common functions operating for both RXTX and umem queues */
-
-static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
-{
-	return q ? q->invalid_descs : 0;
-}
-
-static inline void __xskq_cons_release(struct xsk_queue *q)
-{
-	smp_mb(); /* D, matches A */
-	WRITE_ONCE(q->ring->consumer, q->cached_cons);
-}
-
-static inline void __xskq_cons_peek(struct xsk_queue *q)
-{
-	/* Refresh the local pointer */
-	q->cached_prod = READ_ONCE(q->ring->producer);
-	smp_rmb(); /* C, matches B */
-}
-
-static inline void xskq_cons_get_entries(struct xsk_queue *q)
-{
-	__xskq_cons_release(q);
-	__xskq_cons_peek(q);
-}
-
-static inline bool xskq_prod_is_full(struct xsk_queue *q)
-{
-	u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
-
-	if (free_entries)
-		return false;
-
-	/* Refresh the local tail pointer */
-	q->cached_cons = READ_ONCE(q->ring->consumer);
-	free_entries = q->nentries - (q->cached_prod - q->cached_cons);
-
-	return !free_entries;
-}
-
-static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
-{
-	u32 entries = q->cached_prod - q->cached_cons;
-
-	if (entries >= cnt)
-		return true;
-
-	__xskq_cons_peek(q);
-	entries = q->cached_prod - q->cached_cons;
-
-	return entries >= cnt;
-}
+/* The operations on the rings are the following:
+ *
+ * producer                           consumer
+ *
+ * RESERVE entries                    PEEK in the ring for entries
+ * WRITE data into the ring           READ data from the ring
+ * SUBMIT entries                     RELEASE entries
+ *
+ * The producer reserves one or more entries in the ring. It can then
+ * fill in these entries and finally submit them so that they can be
+ * seen and read by the consumer.
+ *
+ * The consumer peeks into the ring to see if the producer has written
+ * any new entries. If so, the producer can then read these entries
+ * and when it is done reading them release them back to the producer
+ * so that the producer can use these slots to fill in new entries.
+ *
+ * The function names below reflect these operations.
+ */
 
-/* UMEM queue */
+/* Functions that read and validate content from consumer rings. */
 
 static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
 						   u64 addr,
@@ -148,16 +115,6 @@ static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
 	return cross_pg && !next_pg_contig;
 }
 
-static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
-{
-	if (addr >= q->size) {
-		q->invalid_descs++;
-		return false;
-	}
-
-	return true;
-}
-
 static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
 						u64 addr,
 						u64 length,
@@ -175,6 +132,16 @@ static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
 	return true;
 }
 
+static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
+{
+	if (addr >= q->size) {
+		q->invalid_descs++;
+		return false;
+	}
+
+	return true;
+}
+
 static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
 				       struct xdp_umem *umem)
 {
@@ -203,75 +170,6 @@ static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
 	return false;
 }
 
-static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
-				       struct xdp_umem *umem)
-{
-	if (q->cached_prod == q->cached_cons)
-		xskq_cons_get_entries(q);
-	return xskq_cons_read_addr(q, addr, umem);
-}
-
-static inline void xskq_cons_release(struct xsk_queue *q)
-{
-	/* To improve performance, only update local state here.
-	 * Do the actual release operation when we get new entries
-	 * from the ring in xskq_cons_get_entries() instead.
-	 */
-	q->cached_cons++;
-}
-
-static inline int xskq_prod_reserve(struct xsk_queue *q)
-{
-	if (xskq_prod_is_full(q))
-		return -ENOSPC;
-
-	/* A, matches D */
-	q->cached_prod++;
-	return 0;
-}
-
-static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
-{
-	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-
-	if (xskq_prod_is_full(q))
-		return -ENOSPC;
-
-	/* A, matches D */
-	ring->desc[q->cached_prod++ & q->ring_mask] = addr;
-	return 0;
-}
-
-static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
-{
-	/* Order producer and data */
-	smp_wmb(); /* B, matches C */
-
-	WRITE_ONCE(q->ring->producer, idx);
-}
-
-static inline void xskq_prod_submit(struct xsk_queue *q)
-{
-	__xskq_prod_submit(q, q->cached_prod);
-}
-
-static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
-{
-	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-	u32 idx = q->ring->producer;
-
-	ring->desc[idx++ & q->ring_mask] = addr;
-
-	__xskq_prod_submit(q, idx);
-}
-
-static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
-{
-	__xskq_prod_submit(q, q->ring->producer + nb_entries);
-}
-
-/* Rx/Tx queue */
-
 static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
 					   struct xdp_desc *d,
 					   struct xdp_umem *umem)
@@ -318,6 +216,48 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
 	return false;
 }
 
+/* Functions for consumers */
+
+static inline void __xskq_cons_release(struct xsk_queue *q)
+{
+	smp_mb(); /* D, matches A */
+	WRITE_ONCE(q->ring->consumer, q->cached_cons);
+}
+
+static inline void __xskq_cons_peek(struct xsk_queue *q)
+{
+	/* Refresh the local pointer */
+	q->cached_prod = READ_ONCE(q->ring->producer);
+	smp_rmb(); /* C, matches B */
+}
+
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
+{
+	__xskq_cons_release(q);
+	__xskq_cons_peek(q);
+}
+
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+{
+	u32 entries = q->cached_prod - q->cached_cons;
+
+	if (entries >= cnt)
+		return true;
+
+	__xskq_cons_peek(q);
+	entries = q->cached_prod - q->cached_cons;
+
+	return entries >= cnt;
+}
+
+static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+				       struct xdp_umem *umem)
+{
+	if (q->cached_prod == q->cached_cons)
+		xskq_cons_get_entries(q);
+	return xskq_cons_read_addr(q, addr, umem);
+}
+
 static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
 				       struct xdp_desc *desc,
 				       struct xdp_umem *umem)
@@ -327,6 +267,60 @@ static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
 	return xskq_cons_read_desc(q, desc, umem);
 }
 
+static inline void xskq_cons_release(struct xsk_queue *q)
+{
+	/* To improve performance, only update local state here.
+	 * Reflect this to global state when we get new entries
+	 * from the ring in xskq_cons_get_entries().
+	 */
+	q->cached_cons++;
+}
+
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
+{
+	/* No barriers needed since data is not accessed */
+	return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
+		q->nentries;
+}
+
+/* Functions for producers */
+
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
+{
+	u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+	if (free_entries)
+		return false;
+
+	/* Refresh the local tail pointer */
+	q->cached_cons = READ_ONCE(q->ring->consumer);
+	free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+	return !free_entries;
+}
+
+static inline int xskq_prod_reserve(struct xsk_queue *q)
+{
+	if (xskq_prod_is_full(q))
+		return -ENOSPC;
+
+	/* A, matches D */
+	q->cached_prod++;
+	return 0;
+}
+
+static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
+{
+	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+	if (xskq_prod_is_full(q))
+		return -ENOSPC;
+
+	/* A, matches D */
+	ring->desc[q->cached_prod++ & q->ring_mask] = addr;
+	return 0;
+}
+
 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
 					 u64 addr, u32 len)
 {
@@ -344,11 +338,31 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
 	return 0;
 }
 
-static inline bool xskq_cons_is_full(struct xsk_queue *q)
+static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
 {
-	/* No barriers needed since data is not accessed */
-	return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
-		q->nentries;
+	smp_wmb(); /* B, matches C */
+
+	WRITE_ONCE(q->ring->producer, idx);
+}
+
+static inline void xskq_prod_submit(struct xsk_queue *q)
+{
+	__xskq_prod_submit(q, q->cached_prod);
+}
+
+static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
+{
+	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+	u32 idx = q->ring->producer;
+
+	ring->desc[idx++ & q->ring_mask] = addr;
+
+	__xskq_prod_submit(q, idx);
+}
+
+static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
+{
+	__xskq_prod_submit(q, q->ring->producer + nb_entries);
 }
 
 static inline bool xskq_prod_is_empty(struct xsk_queue *q)
@@ -357,6 +371,13 @@ static inline bool xskq_prod_is_empty(struct xsk_queue *q)
 	return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
 }
 
+/* For both producers and consumers */
+
+static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
+{
+	return q ? q->invalid_descs : 0;
+}
+
 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
 void xskq_destroy(struct xsk_queue *q_ops);