diff --git a/Documentation/filesystems/locking.rst b/Documentation/filesystems/locking.rst
index 6e833ed0b22ea76515c394c87fb48e800a20f286..b7e5a3841aa4a31228a579aeded9912d41e3fb54 100644
--- a/Documentation/filesystems/locking.rst
+++ b/Documentation/filesystems/locking.rst
@@ -376,10 +376,17 @@ invalidate_lock before invalidating page cache in truncate / hole punch
 path (and thus calling into ->invalidate_folio) to block races between page
 cache invalidation and page cache filling functions (fault, read, ...).
 
-->release_folio() is called when the kernel is about to try to drop the
-buffers from the folio in preparation for freeing it.  It returns false to
-indicate that the buffers are (or may be) freeable.  If ->release_folio is
-NULL, the kernel assumes that the fs has no private interest in the buffers.
+->release_folio() is called when the MM wants to make a change to the
+folio that would invalidate the filesystem's private data.  For example,
+it may be about to be removed from the address_space or split.  The folio
+is locked and not under writeback.  It may be dirty.  The gfp parameter
+is not usually used for allocation, but rather to indicate what the
+filesystem may do to attempt to free the private data.  The filesystem may
+return false to indicate that the folio's private data cannot be freed.
+If it returns true, it should have already removed the private data from
+the folio.  If a filesystem does not provide a ->release_folio method,
+the pagecache will assume that private data is buffer_heads and call
+try_to_free_buffers().
 
 ->free_folio() is called when the kernel has dropped the folio
 from the page cache.
diff --git a/fs/btrfs/file.c b/fs/btrfs/file.c
index b9e75c9f95acfeea57a5dea55ae5a96a9b5b78c0..3887a8e1c964954323e626992ac50e897460ff25 100644
--- a/fs/btrfs/file.c
+++ b/fs/btrfs/file.c
@@ -876,9 +876,9 @@ static int prepare_uptodate_page(struct inode *inode,
 	return 0;
 }
 
-static unsigned int get_prepare_fgp_flags(bool nowait)
+static fgf_t get_prepare_fgp_flags(bool nowait)
 {
-	unsigned int fgp_flags = FGP_LOCK | FGP_ACCESSED | FGP_CREAT;
+	fgf_t fgp_flags = FGP_LOCK | FGP_ACCESSED | FGP_CREAT;
 
 	if (nowait)
 		fgp_flags |= FGP_NOWAIT;
@@ -910,7 +910,7 @@ static noinline int prepare_pages(struct inode *inode, struct page **pages,
 	int i;
 	unsigned long index = pos >> PAGE_SHIFT;
 	gfp_t mask = get_prepare_gfp_flags(inode, nowait);
-	unsigned int fgp_flags = get_prepare_fgp_flags(nowait);
+	fgf_t fgp_flags = get_prepare_fgp_flags(nowait);
 	int err = 0;
 	int faili;
 
diff --git a/fs/f2fs/compress.c b/fs/f2fs/compress.c
index 236d890f560b0bce8c2d40c7675ab0d5a499b9e6..0f7df9c11af3150510646990df743dfc6e8b0c00 100644
--- a/fs/f2fs/compress.c
+++ b/fs/f2fs/compress.c
@@ -1045,7 +1045,7 @@ static int prepare_compress_overwrite(struct compress_ctx *cc,
 	struct address_space *mapping = cc->inode->i_mapping;
 	struct page *page;
 	sector_t last_block_in_bio;
-	unsigned fgp_flag = FGP_LOCK | FGP_WRITE | FGP_CREAT;
+	fgf_t fgp_flag = FGP_LOCK | FGP_WRITE | FGP_CREAT;
 	pgoff_t start_idx = start_idx_of_cluster(cc);
 	int i, ret;
 
diff --git a/fs/f2fs/f2fs.h b/fs/f2fs/f2fs.h
index e18272ae311924a53498d6d539d8320f00d7f8b7..613132339d72616d42ac30c73b21257ebfb491da 100644
--- a/fs/f2fs/f2fs.h
+++ b/fs/f2fs/f2fs.h
@@ -2736,7 +2736,7 @@ static inline struct page *f2fs_grab_cache_page(struct address_space *mapping,
 
 static inline struct page *f2fs_pagecache_get_page(
 				struct address_space *mapping, pgoff_t index,
-				int fgp_flags, gfp_t gfp_mask)
+				fgf_t fgp_flags, gfp_t gfp_mask)
 {
 	if (time_to_inject(F2FS_M_SB(mapping), FAULT_PAGE_GET))
 		return NULL;
diff --git a/fs/gfs2/aops.c b/fs/gfs2/aops.c
index ae49256b7c8c6244f1e219801351c8470a2ba2c6..9c4b26aec5803fbf89560a4ba20a40ce11308663 100644
--- a/fs/gfs2/aops.c
+++ b/fs/gfs2/aops.c
@@ -747,7 +747,7 @@ static const struct address_space_operations gfs2_aops = {
 	.writepages = gfs2_writepages,
 	.read_folio = gfs2_read_folio,
 	.readahead = gfs2_readahead,
-	.dirty_folio = filemap_dirty_folio,
+	.dirty_folio = iomap_dirty_folio,
 	.release_folio = iomap_release_folio,
 	.invalidate_folio = iomap_invalidate_folio,
 	.bmap = gfs2_bmap,
diff --git a/fs/gfs2/bmap.c b/fs/gfs2/bmap.c
index 45ea63f7167d5a0bd11789bf68bb3c45741737a2..f62366be7587b925d356201cea2689dda8b35a8e 100644
--- a/fs/gfs2/bmap.c
+++ b/fs/gfs2/bmap.c
@@ -971,7 +971,7 @@ gfs2_iomap_get_folio(struct iomap_iter *iter, loff_t pos, unsigned len)
 	if (status)
 		return ERR_PTR(status);
 
-	folio = iomap_get_folio(iter, pos);
+	folio = iomap_get_folio(iter, pos, len);
 	if (IS_ERR(folio))
 		gfs2_trans_end(sdp);
 	return folio;
diff --git a/fs/iomap/buffered-io.c b/fs/iomap/buffered-io.c
index aa8967cca1a31bf2dda5ad6c0ef893ddbfcc7da7..283fb96f660942fe3fad91d3b224d0779247f454 100644
--- a/fs/iomap/buffered-io.c
+++ b/fs/iomap/buffered-io.c
@@ -23,65 +23,169 @@
 
 #define IOEND_BATCH_SIZE	4096
 
+typedef int (*iomap_punch_t)(struct inode *inode, loff_t offset, loff_t length);
 /*
- * Structure allocated for each folio when block size < folio size
- * to track sub-folio uptodate status and I/O completions.
+ * Structure allocated for each folio to track per-block uptodate, dirty state
+ * and I/O completions.
  */
-struct iomap_page {
+struct iomap_folio_state {
 	atomic_t		read_bytes_pending;
 	atomic_t		write_bytes_pending;
-	spinlock_t		uptodate_lock;
-	unsigned long		uptodate[];
+	spinlock_t		state_lock;
+
+	/*
+	 * Each block has two bits in this bitmap:
+	 * Bits [0..blocks_per_folio) has the uptodate status.
+	 * Bits [b_p_f...(2*b_p_f))   has the dirty status.
+	 */
+	unsigned long		state[];
 };
 
-static inline struct iomap_page *to_iomap_page(struct folio *folio)
+static struct bio_set iomap_ioend_bioset;
+
+static inline bool ifs_is_fully_uptodate(struct folio *folio,
+		struct iomap_folio_state *ifs)
 {
-	if (folio_test_private(folio))
-		return folio_get_private(folio);
-	return NULL;
+	struct inode *inode = folio->mapping->host;
+
+	return bitmap_full(ifs->state, i_blocks_per_folio(inode, folio));
 }
 
-static struct bio_set iomap_ioend_bioset;
+static inline bool ifs_block_is_uptodate(struct iomap_folio_state *ifs,
+		unsigned int block)
+{
+	return test_bit(block, ifs->state);
+}
+
+static void ifs_set_range_uptodate(struct folio *folio,
+		struct iomap_folio_state *ifs, size_t off, size_t len)
+{
+	struct inode *inode = folio->mapping->host;
+	unsigned int first_blk = off >> inode->i_blkbits;
+	unsigned int last_blk = (off + len - 1) >> inode->i_blkbits;
+	unsigned int nr_blks = last_blk - first_blk + 1;
+	unsigned long flags;
+
+	spin_lock_irqsave(&ifs->state_lock, flags);
+	bitmap_set(ifs->state, first_blk, nr_blks);
+	if (ifs_is_fully_uptodate(folio, ifs))
+		folio_mark_uptodate(folio);
+	spin_unlock_irqrestore(&ifs->state_lock, flags);
+}
+
+static void iomap_set_range_uptodate(struct folio *folio, size_t off,
+		size_t len)
+{
+	struct iomap_folio_state *ifs = folio->private;
+
+	if (ifs)
+		ifs_set_range_uptodate(folio, ifs, off, len);
+	else
+		folio_mark_uptodate(folio);
+}
+
+static inline bool ifs_block_is_dirty(struct folio *folio,
+		struct iomap_folio_state *ifs, int block)
+{
+	struct inode *inode = folio->mapping->host;
+	unsigned int blks_per_folio = i_blocks_per_folio(inode, folio);
+
+	return test_bit(block + blks_per_folio, ifs->state);
+}
+
+static void ifs_clear_range_dirty(struct folio *folio,
+		struct iomap_folio_state *ifs, size_t off, size_t len)
+{
+	struct inode *inode = folio->mapping->host;
+	unsigned int blks_per_folio = i_blocks_per_folio(inode, folio);
+	unsigned int first_blk = (off >> inode->i_blkbits);
+	unsigned int last_blk = (off + len - 1) >> inode->i_blkbits;
+	unsigned int nr_blks = last_blk - first_blk + 1;
+	unsigned long flags;
 
-static struct iomap_page *
-iomap_page_create(struct inode *inode, struct folio *folio, unsigned int flags)
+	spin_lock_irqsave(&ifs->state_lock, flags);
+	bitmap_clear(ifs->state, first_blk + blks_per_folio, nr_blks);
+	spin_unlock_irqrestore(&ifs->state_lock, flags);
+}
+
+static void iomap_clear_range_dirty(struct folio *folio, size_t off, size_t len)
 {
-	struct iomap_page *iop = to_iomap_page(folio);
+	struct iomap_folio_state *ifs = folio->private;
+
+	if (ifs)
+		ifs_clear_range_dirty(folio, ifs, off, len);
+}
+
+static void ifs_set_range_dirty(struct folio *folio,
+		struct iomap_folio_state *ifs, size_t off, size_t len)
+{
+	struct inode *inode = folio->mapping->host;
+	unsigned int blks_per_folio = i_blocks_per_folio(inode, folio);
+	unsigned int first_blk = (off >> inode->i_blkbits);
+	unsigned int last_blk = (off + len - 1) >> inode->i_blkbits;
+	unsigned int nr_blks = last_blk - first_blk + 1;
+	unsigned long flags;
+
+	spin_lock_irqsave(&ifs->state_lock, flags);
+	bitmap_set(ifs->state, first_blk + blks_per_folio, nr_blks);
+	spin_unlock_irqrestore(&ifs->state_lock, flags);
+}
+
+static void iomap_set_range_dirty(struct folio *folio, size_t off, size_t len)
+{
+	struct iomap_folio_state *ifs = folio->private;
+
+	if (ifs)
+		ifs_set_range_dirty(folio, ifs, off, len);
+}
+
+static struct iomap_folio_state *ifs_alloc(struct inode *inode,
+		struct folio *folio, unsigned int flags)
+{
+	struct iomap_folio_state *ifs = folio->private;
 	unsigned int nr_blocks = i_blocks_per_folio(inode, folio);
 	gfp_t gfp;
 
-	if (iop || nr_blocks <= 1)
-		return iop;
+	if (ifs || nr_blocks <= 1)
+		return ifs;
 
 	if (flags & IOMAP_NOWAIT)
 		gfp = GFP_NOWAIT;
 	else
 		gfp = GFP_NOFS | __GFP_NOFAIL;
 
-	iop = kzalloc(struct_size(iop, uptodate, BITS_TO_LONGS(nr_blocks)),
-		      gfp);
-	if (iop) {
-		spin_lock_init(&iop->uptodate_lock);
-		if (folio_test_uptodate(folio))
-			bitmap_fill(iop->uptodate, nr_blocks);
-		folio_attach_private(folio, iop);
-	}
-	return iop;
+	/*
+	 * ifs->state tracks two sets of state flags when the
+	 * filesystem block size is smaller than the folio size.
+	 * The first state tracks per-block uptodate and the
+	 * second tracks per-block dirty state.
+	 */
+	ifs = kzalloc(struct_size(ifs, state,
+		      BITS_TO_LONGS(2 * nr_blocks)), gfp);
+	if (!ifs)
+		return ifs;
+
+	spin_lock_init(&ifs->state_lock);
+	if (folio_test_uptodate(folio))
+		bitmap_set(ifs->state, 0, nr_blocks);
+	if (folio_test_dirty(folio))
+		bitmap_set(ifs->state, nr_blocks, nr_blocks);
+	folio_attach_private(folio, ifs);
+
+	return ifs;
 }
 
-static void iomap_page_release(struct folio *folio)
+static void ifs_free(struct folio *folio)
 {
-	struct iomap_page *iop = folio_detach_private(folio);
-	struct inode *inode = folio->mapping->host;
-	unsigned int nr_blocks = i_blocks_per_folio(inode, folio);
+	struct iomap_folio_state *ifs = folio_detach_private(folio);
 
-	if (!iop)
+	if (!ifs)
 		return;
-	WARN_ON_ONCE(atomic_read(&iop->read_bytes_pending));
-	WARN_ON_ONCE(atomic_read(&iop->write_bytes_pending));
-	WARN_ON_ONCE(bitmap_full(iop->uptodate, nr_blocks) !=
+	WARN_ON_ONCE(atomic_read(&ifs->read_bytes_pending));
+	WARN_ON_ONCE(atomic_read(&ifs->write_bytes_pending));
+	WARN_ON_ONCE(ifs_is_fully_uptodate(folio, ifs) !=
 			folio_test_uptodate(folio));
-	kfree(iop);
+	kfree(ifs);
 }
 
 /*
@@ -90,7 +194,7 @@ static void iomap_page_release(struct folio *folio)
 static void iomap_adjust_read_range(struct inode *inode, struct folio *folio,
 		loff_t *pos, loff_t length, size_t *offp, size_t *lenp)
 {
-	struct iomap_page *iop = to_iomap_page(folio);
+	struct iomap_folio_state *ifs = folio->private;
 	loff_t orig_pos = *pos;
 	loff_t isize = i_size_read(inode);
 	unsigned block_bits = inode->i_blkbits;
@@ -105,12 +209,12 @@ static void iomap_adjust_read_range(struct inode *inode, struct folio *folio,
 	 * per-block uptodate status and adjust the offset and length if needed
 	 * to avoid reading in already uptodate ranges.
 	 */
-	if (iop) {
+	if (ifs) {
 		unsigned int i;
 
 		/* move forward for each leading block marked uptodate */
 		for (i = first; i <= last; i++) {
-			if (!test_bit(i, iop->uptodate))
+			if (!ifs_block_is_uptodate(ifs, i))
 				break;
 			*pos += block_size;
 			poff += block_size;
@@ -120,7 +224,7 @@ static void iomap_adjust_read_range(struct inode *inode, struct folio *folio,
 
 		/* truncate len if we find any trailing uptodate block(s) */
 		for ( ; i <= last; i++) {
-			if (test_bit(i, iop->uptodate)) {
+			if (ifs_block_is_uptodate(ifs, i)) {
 				plen -= (last - i + 1) * block_size;
 				last = i - 1;
 				break;
@@ -144,43 +248,19 @@ static void iomap_adjust_read_range(struct inode *inode, struct folio *folio,
 	*lenp = plen;
 }
 
-static void iomap_iop_set_range_uptodate(struct folio *folio,
-		struct iomap_page *iop, size_t off, size_t len)
-{
-	struct inode *inode = folio->mapping->host;
-	unsigned first = off >> inode->i_blkbits;
-	unsigned last = (off + len - 1) >> inode->i_blkbits;
-	unsigned long flags;
-
-	spin_lock_irqsave(&iop->uptodate_lock, flags);
-	bitmap_set(iop->uptodate, first, last - first + 1);
-	if (bitmap_full(iop->uptodate, i_blocks_per_folio(inode, folio)))
-		folio_mark_uptodate(folio);
-	spin_unlock_irqrestore(&iop->uptodate_lock, flags);
-}
-
-static void iomap_set_range_uptodate(struct folio *folio,
-		struct iomap_page *iop, size_t off, size_t len)
-{
-	if (iop)
-		iomap_iop_set_range_uptodate(folio, iop, off, len);
-	else
-		folio_mark_uptodate(folio);
-}
-
 static void iomap_finish_folio_read(struct folio *folio, size_t offset,
 		size_t len, int error)
 {
-	struct iomap_page *iop = to_iomap_page(folio);
+	struct iomap_folio_state *ifs = folio->private;
 
 	if (unlikely(error)) {
 		folio_clear_uptodate(folio);
 		folio_set_error(folio);
 	} else {
-		iomap_set_range_uptodate(folio, iop, offset, len);
+		iomap_set_range_uptodate(folio, offset, len);
 	}
 
-	if (!iop || atomic_sub_and_test(len, &iop->read_bytes_pending))
+	if (!ifs || atomic_sub_and_test(len, &ifs->read_bytes_pending))
 		folio_unlock(folio);
 }
 
@@ -213,7 +293,6 @@ struct iomap_readpage_ctx {
 static int iomap_read_inline_data(const struct iomap_iter *iter,
 		struct folio *folio)
 {
-	struct iomap_page *iop;
 	const struct iomap *iomap = iomap_iter_srcmap(iter);
 	size_t size = i_size_read(iter->inode) - iomap->offset;
 	size_t poff = offset_in_page(iomap->offset);
@@ -231,15 +310,13 @@ static int iomap_read_inline_data(const struct iomap_iter *iter,
 	if (WARN_ON_ONCE(size > iomap->length))
 		return -EIO;
 	if (offset > 0)
-		iop = iomap_page_create(iter->inode, folio, iter->flags);
-	else
-		iop = to_iomap_page(folio);
+		ifs_alloc(iter->inode, folio, iter->flags);
 
 	addr = kmap_local_folio(folio, offset);
 	memcpy(addr, iomap->inline_data, size);
 	memset(addr + size, 0, PAGE_SIZE - poff - size);
 	kunmap_local(addr);
-	iomap_set_range_uptodate(folio, iop, offset, PAGE_SIZE - poff);
+	iomap_set_range_uptodate(folio, offset, PAGE_SIZE - poff);
 	return 0;
 }
 
@@ -260,7 +337,7 @@ static loff_t iomap_readpage_iter(const struct iomap_iter *iter,
 	loff_t pos = iter->pos + offset;
 	loff_t length = iomap_length(iter) - offset;
 	struct folio *folio = ctx->cur_folio;
-	struct iomap_page *iop;
+	struct iomap_folio_state *ifs;
 	loff_t orig_pos = pos;
 	size_t poff, plen;
 	sector_t sector;
@@ -269,20 +346,20 @@ static loff_t iomap_readpage_iter(const struct iomap_iter *iter,
 		return iomap_read_inline_data(iter, folio);
 
 	/* zero post-eof blocks as the page may be mapped */
-	iop = iomap_page_create(iter->inode, folio, iter->flags);
+	ifs = ifs_alloc(iter->inode, folio, iter->flags);
 	iomap_adjust_read_range(iter->inode, folio, &pos, length, &poff, &plen);
 	if (plen == 0)
 		goto done;
 
 	if (iomap_block_needs_zeroing(iter, pos)) {
 		folio_zero_range(folio, poff, plen);
-		iomap_set_range_uptodate(folio, iop, poff, plen);
+		iomap_set_range_uptodate(folio, poff, plen);
 		goto done;
 	}
 
 	ctx->cur_folio_in_bio = true;
-	if (iop)
-		atomic_add(plen, &iop->read_bytes_pending);
+	if (ifs)
+		atomic_add(plen, &ifs->read_bytes_pending);
 
 	sector = iomap_sector(iomap, pos);
 	if (!ctx->bio ||
@@ -436,11 +513,11 @@ EXPORT_SYMBOL_GPL(iomap_readahead);
  */
 bool iomap_is_partially_uptodate(struct folio *folio, size_t from, size_t count)
 {
-	struct iomap_page *iop = to_iomap_page(folio);
+	struct iomap_folio_state *ifs = folio->private;
 	struct inode *inode = folio->mapping->host;
 	unsigned first, last, i;
 
-	if (!iop)
+	if (!ifs)
 		return false;
 
 	/* Caller's range may extend past the end of this folio */
@@ -451,7 +528,7 @@ bool iomap_is_partially_uptodate(struct folio *folio, size_t from, size_t count)
 	last = (from + count - 1) >> inode->i_blkbits;
 
 	for (i = first; i <= last; i++)
-		if (!test_bit(i, iop->uptodate))
+		if (!ifs_block_is_uptodate(ifs, i))
 			return false;
 	return true;
 }
@@ -461,16 +538,18 @@ EXPORT_SYMBOL_GPL(iomap_is_partially_uptodate);
  * iomap_get_folio - get a folio reference for writing
  * @iter: iteration structure
  * @pos: start offset of write
+ * @len: Suggested size of folio to create.
  *
  * Returns a locked reference to the folio at @pos, or an error pointer if the
  * folio could not be obtained.
  */
-struct folio *iomap_get_folio(struct iomap_iter *iter, loff_t pos)
+struct folio *iomap_get_folio(struct iomap_iter *iter, loff_t pos, size_t len)
 {
-	unsigned fgp = FGP_WRITEBEGIN | FGP_NOFS;
+	fgf_t fgp = FGP_WRITEBEGIN | FGP_NOFS;
 
 	if (iter->flags & IOMAP_NOWAIT)
 		fgp |= FGP_NOWAIT;
+	fgp |= fgf_set_order(len);
 
 	return __filemap_get_folio(iter->inode->i_mapping, pos >> PAGE_SHIFT,
 			fgp, mapping_gfp_mask(iter->inode->i_mapping));
@@ -483,14 +562,13 @@ bool iomap_release_folio(struct folio *folio, gfp_t gfp_flags)
 			folio_size(folio));
 
 	/*
-	 * mm accommodates an old ext3 case where clean folios might
-	 * not have had the dirty bit cleared.  Thus, it can send actual
-	 * dirty folios to ->release_folio() via shrink_active_list();
-	 * skip those here.
+	 * If the folio is dirty, we refuse to release our metadata because
+	 * it may be partially dirty.  Once we track per-block dirty state,
+	 * we can release the metadata if every block is dirty.
 	 */
-	if (folio_test_dirty(folio) || folio_test_writeback(folio))
+	if (folio_test_dirty(folio))
 		return false;
-	iomap_page_release(folio);
+	ifs_free(folio);
 	return true;
 }
 EXPORT_SYMBOL_GPL(iomap_release_folio);
@@ -507,16 +585,22 @@ void iomap_invalidate_folio(struct folio *folio, size_t offset, size_t len)
 	if (offset == 0 && len == folio_size(folio)) {
 		WARN_ON_ONCE(folio_test_writeback(folio));
 		folio_cancel_dirty(folio);
-		iomap_page_release(folio);
-	} else if (folio_test_large(folio)) {
-		/* Must release the iop so the page can be split */
-		WARN_ON_ONCE(!folio_test_uptodate(folio) &&
-			     folio_test_dirty(folio));
-		iomap_page_release(folio);
+		ifs_free(folio);
 	}
 }
 EXPORT_SYMBOL_GPL(iomap_invalidate_folio);
 
+bool iomap_dirty_folio(struct address_space *mapping, struct folio *folio)
+{
+	struct inode *inode = mapping->host;
+	size_t len = folio_size(folio);
+
+	ifs_alloc(inode, folio, 0);
+	iomap_set_range_dirty(folio, 0, len);
+	return filemap_dirty_folio(mapping, folio);
+}
+EXPORT_SYMBOL_GPL(iomap_dirty_folio);
+
 static void
 iomap_write_failed(struct inode *inode, loff_t pos, unsigned len)
 {
@@ -547,7 +631,7 @@ static int __iomap_write_begin(const struct iomap_iter *iter, loff_t pos,
 		size_t len, struct folio *folio)
 {
 	const struct iomap *srcmap = iomap_iter_srcmap(iter);
-	struct iomap_page *iop;
+	struct iomap_folio_state *ifs;
 	loff_t block_size = i_blocksize(iter->inode);
 	loff_t block_start = round_down(pos, block_size);
 	loff_t block_end = round_up(pos + len, block_size);
@@ -555,14 +639,23 @@ static int __iomap_write_begin(const struct iomap_iter *iter, loff_t pos,
 	size_t from = offset_in_folio(folio, pos), to = from + len;
 	size_t poff, plen;
 
-	if (folio_test_uptodate(folio))
+	/*
+	 * If the write completely overlaps the current folio, then
+	 * entire folio will be dirtied so there is no need for
+	 * per-block state tracking structures to be attached to this folio.
+	 */
+	if (pos <= folio_pos(folio) &&
+	    pos + len >= folio_pos(folio) + folio_size(folio))
 		return 0;
-	folio_clear_error(folio);
 
-	iop = iomap_page_create(iter->inode, folio, iter->flags);
-	if ((iter->flags & IOMAP_NOWAIT) && !iop && nr_blocks > 1)
+	ifs = ifs_alloc(iter->inode, folio, iter->flags);
+	if ((iter->flags & IOMAP_NOWAIT) && !ifs && nr_blocks > 1)
 		return -EAGAIN;
 
+	if (folio_test_uptodate(folio))
+		return 0;
+	folio_clear_error(folio);
+
 	do {
 		iomap_adjust_read_range(iter->inode, folio, &block_start,
 				block_end - block_start, &poff, &plen);
@@ -589,7 +682,7 @@ static int __iomap_write_begin(const struct iomap_iter *iter, loff_t pos,
 			if (status)
 				return status;
 		}
-		iomap_set_range_uptodate(folio, iop, poff, plen);
+		iomap_set_range_uptodate(folio, poff, plen);
 	} while ((block_start += plen) < block_end);
 
 	return 0;
@@ -603,7 +696,7 @@ static struct folio *__iomap_get_folio(struct iomap_iter *iter, loff_t pos,
 	if (folio_ops && folio_ops->get_folio)
 		return folio_ops->get_folio(iter, pos, len);
 	else
-		return iomap_get_folio(iter, pos);
+		return iomap_get_folio(iter, pos, len);
 }
 
 static void __iomap_put_folio(struct iomap_iter *iter, loff_t pos, size_t ret,
@@ -696,7 +789,6 @@ static int iomap_write_begin(struct iomap_iter *iter, loff_t pos,
 static size_t __iomap_write_end(struct inode *inode, loff_t pos, size_t len,
 		size_t copied, struct folio *folio)
 {
-	struct iomap_page *iop = to_iomap_page(folio);
 	flush_dcache_folio(folio);
 
 	/*
@@ -712,7 +804,8 @@ static size_t __iomap_write_end(struct inode *inode, loff_t pos, size_t len,
 	 */
 	if (unlikely(copied < len && !folio_test_uptodate(folio)))
 		return 0;
-	iomap_set_range_uptodate(folio, iop, offset_in_folio(folio, pos), len);
+	iomap_set_range_uptodate(folio, offset_in_folio(folio, pos), len);
+	iomap_set_range_dirty(folio, offset_in_folio(folio, pos), copied);
 	filemap_dirty_folio(inode->i_mapping, folio);
 	return copied;
 }
@@ -773,6 +866,7 @@ static size_t iomap_write_end(struct iomap_iter *iter, loff_t pos, size_t len,
 static loff_t iomap_write_iter(struct iomap_iter *iter, struct iov_iter *i)
 {
 	loff_t length = iomap_length(iter);
+	size_t chunk = PAGE_SIZE << MAX_PAGECACHE_ORDER;
 	loff_t pos = iter->pos;
 	ssize_t written = 0;
 	long status = 0;
@@ -781,15 +875,12 @@ static loff_t iomap_write_iter(struct iomap_iter *iter, struct iov_iter *i)
 
 	do {
 		struct folio *folio;
-		struct page *page;
-		unsigned long offset;	/* Offset into pagecache page */
-		unsigned long bytes;	/* Bytes to write to page */
+		size_t offset;		/* Offset into folio */
+		size_t bytes;		/* Bytes to write to folio */
 		size_t copied;		/* Bytes copied from user */
 
-		offset = offset_in_page(pos);
-		bytes = min_t(unsigned long, PAGE_SIZE - offset,
-						iov_iter_count(i));
-again:
+		offset = pos & (chunk - 1);
+		bytes = min(chunk - offset, iov_iter_count(i));
 		status = balance_dirty_pages_ratelimited_flags(mapping,
 							       bdp_flags);
 		if (unlikely(status))
@@ -819,12 +910,14 @@ static loff_t iomap_write_iter(struct iomap_iter *iter, struct iov_iter *i)
 		if (iter->iomap.flags & IOMAP_F_STALE)
 			break;
 
-		page = folio_file_page(folio, pos >> PAGE_SHIFT);
-		if (mapping_writably_mapped(mapping))
-			flush_dcache_page(page);
+		offset = offset_in_folio(folio, pos);
+		if (bytes > folio_size(folio) - offset)
+			bytes = folio_size(folio) - offset;
 
-		copied = copy_page_from_iter_atomic(page, offset, bytes, i);
+		if (mapping_writably_mapped(mapping))
+			flush_dcache_folio(folio);
 
+		copied = copy_folio_from_iter_atomic(folio, offset, bytes, i);
 		status = iomap_write_end(iter, pos, bytes, copied, folio);
 
 		if (unlikely(copied != status))
@@ -840,11 +933,13 @@ static loff_t iomap_write_iter(struct iomap_iter *iter, struct iov_iter *i)
 			 */
 			if (copied)
 				bytes = copied;
-			goto again;
+			if (chunk > PAGE_SIZE)
+				chunk /= 2;
+		} else {
+			pos += status;
+			written += status;
+			length -= status;
 		}
-		pos += status;
-		written += status;
-		length -= status;
 	} while (iov_iter_count(i) && length);
 
 	if (status == -EAGAIN) {
@@ -880,6 +975,76 @@ iomap_file_buffered_write(struct kiocb *iocb, struct iov_iter *i,
 }
 EXPORT_SYMBOL_GPL(iomap_file_buffered_write);
 
+static int iomap_write_delalloc_ifs_punch(struct inode *inode,
+		struct folio *folio, loff_t start_byte, loff_t end_byte,
+		iomap_punch_t punch)
+{
+	unsigned int first_blk, last_blk, i;
+	loff_t last_byte;
+	u8 blkbits = inode->i_blkbits;
+	struct iomap_folio_state *ifs;
+	int ret = 0;
+
+	/*
+	 * When we have per-block dirty tracking, there can be
+	 * blocks within a folio which are marked uptodate
+	 * but not dirty. In that case it is necessary to punch
+	 * out such blocks to avoid leaking any delalloc blocks.
+	 */
+	ifs = folio->private;
+	if (!ifs)
+		return ret;
+
+	last_byte = min_t(loff_t, end_byte - 1,
+			folio_pos(folio) + folio_size(folio) - 1);
+	first_blk = offset_in_folio(folio, start_byte) >> blkbits;
+	last_blk = offset_in_folio(folio, last_byte) >> blkbits;
+	for (i = first_blk; i <= last_blk; i++) {
+		if (!ifs_block_is_dirty(folio, ifs, i)) {
+			ret = punch(inode, folio_pos(folio) + (i << blkbits),
+				    1 << blkbits);
+			if (ret)
+				return ret;
+		}
+	}
+
+	return ret;
+}
+
+
+static int iomap_write_delalloc_punch(struct inode *inode, struct folio *folio,
+		loff_t *punch_start_byte, loff_t start_byte, loff_t end_byte,
+		iomap_punch_t punch)
+{
+	int ret = 0;
+
+	if (!folio_test_dirty(folio))
+		return ret;
+
+	/* if dirty, punch up to offset */
+	if (start_byte > *punch_start_byte) {
+		ret = punch(inode, *punch_start_byte,
+				start_byte - *punch_start_byte);
+		if (ret)
+			return ret;
+	}
+
+	/* Punch non-dirty blocks within folio */
+	ret = iomap_write_delalloc_ifs_punch(inode, folio, start_byte,
+			end_byte, punch);
+	if (ret)
+		return ret;
+
+	/*
+	 * Make sure the next punch start is correctly bound to
+	 * the end of this data range, not the end of the folio.
+	 */
+	*punch_start_byte = min_t(loff_t, end_byte,
+				folio_pos(folio) + folio_size(folio));
+
+	return ret;
+}
+
 /*
  * Scan the data range passed to us for dirty page cache folios. If we find a
  * dirty folio, punch out the preceeding range and update the offset from which
@@ -899,10 +1064,11 @@ EXPORT_SYMBOL_GPL(iomap_file_buffered_write);
  */
 static int iomap_write_delalloc_scan(struct inode *inode,
 		loff_t *punch_start_byte, loff_t start_byte, loff_t end_byte,
-		int (*punch)(struct inode *inode, loff_t offset, loff_t length))
+		iomap_punch_t punch)
 {
 	while (start_byte < end_byte) {
 		struct folio	*folio;
+		int ret;
 
 		/* grab locked page */
 		folio = filemap_lock_folio(inode->i_mapping,
@@ -913,26 +1079,12 @@ static int iomap_write_delalloc_scan(struct inode *inode,
 			continue;
 		}
 
-		/* if dirty, punch up to offset */
-		if (folio_test_dirty(folio)) {
-			if (start_byte > *punch_start_byte) {
-				int	error;
-
-				error = punch(inode, *punch_start_byte,
-						start_byte - *punch_start_byte);
-				if (error) {
-					folio_unlock(folio);
-					folio_put(folio);
-					return error;
-				}
-			}
-
-			/*
-			 * Make sure the next punch start is correctly bound to
-			 * the end of this data range, not the end of the folio.
-			 */
-			*punch_start_byte = min_t(loff_t, end_byte,
-					folio_next_index(folio) << PAGE_SHIFT);
+		ret = iomap_write_delalloc_punch(inode, folio, punch_start_byte,
+						 start_byte, end_byte, punch);
+		if (ret) {
+			folio_unlock(folio);
+			folio_put(folio);
+			return ret;
 		}
 
 		/* move offset to start of next folio in range */
@@ -977,8 +1129,7 @@ static int iomap_write_delalloc_scan(struct inode *inode,
  * the code to subtle off-by-one bugs....
  */
 static int iomap_write_delalloc_release(struct inode *inode,
-		loff_t start_byte, loff_t end_byte,
-		int (*punch)(struct inode *inode, loff_t pos, loff_t length))
+		loff_t start_byte, loff_t end_byte, iomap_punch_t punch)
 {
 	loff_t punch_start_byte = start_byte;
 	loff_t scan_end_byte = min(i_size_read(inode), end_byte);
@@ -1071,8 +1222,7 @@ static int iomap_write_delalloc_release(struct inode *inode,
  */
 int iomap_file_buffered_write_punch_delalloc(struct inode *inode,
 		struct iomap *iomap, loff_t pos, loff_t length,
-		ssize_t written,
-		int (*punch)(struct inode *inode, loff_t pos, loff_t length))
+		ssize_t written, iomap_punch_t punch)
 {
 	loff_t			start_byte;
 	loff_t			end_byte;
@@ -1293,17 +1443,17 @@ EXPORT_SYMBOL_GPL(iomap_page_mkwrite);
 static void iomap_finish_folio_write(struct inode *inode, struct folio *folio,
 		size_t len, int error)
 {
-	struct iomap_page *iop = to_iomap_page(folio);
+	struct iomap_folio_state *ifs = folio->private;
 
 	if (error) {
 		folio_set_error(folio);
 		mapping_set_error(inode->i_mapping, error);
 	}
 
-	WARN_ON_ONCE(i_blocks_per_folio(inode, folio) > 1 && !iop);
-	WARN_ON_ONCE(iop && atomic_read(&iop->write_bytes_pending) <= 0);
+	WARN_ON_ONCE(i_blocks_per_folio(inode, folio) > 1 && !ifs);
+	WARN_ON_ONCE(ifs && atomic_read(&ifs->write_bytes_pending) <= 0);
 
-	if (!iop || atomic_sub_and_test(len, &iop->write_bytes_pending))
+	if (!ifs || atomic_sub_and_test(len, &ifs->write_bytes_pending))
 		folio_end_writeback(folio);
 }
 
@@ -1570,7 +1720,7 @@ iomap_can_add_to_ioend(struct iomap_writepage_ctx *wpc, loff_t offset,
  */
 static void
 iomap_add_to_ioend(struct inode *inode, loff_t pos, struct folio *folio,
-		struct iomap_page *iop, struct iomap_writepage_ctx *wpc,
+		struct iomap_folio_state *ifs, struct iomap_writepage_ctx *wpc,
 		struct writeback_control *wbc, struct list_head *iolist)
 {
 	sector_t sector = iomap_sector(&wpc->iomap, pos);
@@ -1588,8 +1738,8 @@ iomap_add_to_ioend(struct inode *inode, loff_t pos, struct folio *folio,
 		bio_add_folio_nofail(wpc->ioend->io_bio, folio, len, poff);
 	}
 
-	if (iop)
-		atomic_add(len, &iop->write_bytes_pending);
+	if (ifs)
+		atomic_add(len, &ifs->write_bytes_pending);
 	wpc->ioend->io_size += len;
 	wbc_account_cgroup_owner(wbc, &folio->page, len);
 }
@@ -1615,7 +1765,7 @@ iomap_writepage_map(struct iomap_writepage_ctx *wpc,
 		struct writeback_control *wbc, struct inode *inode,
 		struct folio *folio, u64 end_pos)
 {
-	struct iomap_page *iop = iomap_page_create(inode, folio, 0);
+	struct iomap_folio_state *ifs = folio->private;
 	struct iomap_ioend *ioend, *next;
 	unsigned len = i_blocksize(inode);
 	unsigned nblocks = i_blocks_per_folio(inode, folio);
@@ -1623,7 +1773,14 @@ iomap_writepage_map(struct iomap_writepage_ctx *wpc,
 	int error = 0, count = 0, i;
 	LIST_HEAD(submit_list);
 
-	WARN_ON_ONCE(iop && atomic_read(&iop->write_bytes_pending) != 0);
+	WARN_ON_ONCE(end_pos <= pos);
+
+	if (!ifs && nblocks > 1) {
+		ifs = ifs_alloc(inode, folio, 0);
+		iomap_set_range_dirty(folio, 0, end_pos - pos);
+	}
+
+	WARN_ON_ONCE(ifs && atomic_read(&ifs->write_bytes_pending) != 0);
 
 	/*
 	 * Walk through the folio to find areas to write back. If we
@@ -1631,7 +1788,7 @@ iomap_writepage_map(struct iomap_writepage_ctx *wpc,
 	 * invalid, grab a new one.
 	 */
 	for (i = 0; i < nblocks && pos < end_pos; i++, pos += len) {
-		if (iop && !test_bit(i, iop->uptodate))
+		if (ifs && !ifs_block_is_dirty(folio, ifs, i))
 			continue;
 
 		error = wpc->ops->map_blocks(wpc, inode, pos);
@@ -1642,7 +1799,7 @@ iomap_writepage_map(struct iomap_writepage_ctx *wpc,
 			continue;
 		if (wpc->iomap.type == IOMAP_HOLE)
 			continue;
-		iomap_add_to_ioend(inode, pos, folio, iop, wpc, wbc,
+		iomap_add_to_ioend(inode, pos, folio, ifs, wpc, wbc,
 				 &submit_list);
 		count++;
 	}
@@ -1675,6 +1832,12 @@ iomap_writepage_map(struct iomap_writepage_ctx *wpc,
 		}
 	}
 
+	/*
+	 * We can have dirty bits set past end of file in page_mkwrite path
+	 * while mapping the last partial folio. Hence it's better to clear
+	 * all the dirty bits in the folio here.
+	 */
+	iomap_clear_range_dirty(folio, 0, folio_size(folio));
 	folio_start_writeback(folio);
 	folio_unlock(folio);
 
diff --git a/fs/iomap/direct-io.c b/fs/iomap/direct-io.c
index ea3b868c8355df0ed39124f6d2e78702de742a4e..bcd3f8cf5ea42f0fa0e4967510230f7e5d25b1fa 100644
--- a/fs/iomap/direct-io.c
+++ b/fs/iomap/direct-io.c
@@ -20,10 +20,12 @@
  * Private flags for iomap_dio, must not overlap with the public ones in
  * iomap.h:
  */
-#define IOMAP_DIO_WRITE_FUA	(1 << 28)
-#define IOMAP_DIO_NEED_SYNC	(1 << 29)
-#define IOMAP_DIO_WRITE		(1 << 30)
-#define IOMAP_DIO_DIRTY		(1 << 31)
+#define IOMAP_DIO_CALLER_COMP	(1U << 26)
+#define IOMAP_DIO_INLINE_COMP	(1U << 27)
+#define IOMAP_DIO_WRITE_THROUGH	(1U << 28)
+#define IOMAP_DIO_NEED_SYNC	(1U << 29)
+#define IOMAP_DIO_WRITE		(1U << 30)
+#define IOMAP_DIO_DIRTY		(1U << 31)
 
 struct iomap_dio {
 	struct kiocb		*iocb;
@@ -41,7 +43,6 @@ struct iomap_dio {
 		struct {
 			struct iov_iter		*iter;
 			struct task_struct	*waiter;
-			struct bio		*poll_bio;
 		} submit;
 
 		/* used for aio completion: */
@@ -63,12 +64,14 @@ static struct bio *iomap_dio_alloc_bio(const struct iomap_iter *iter,
 static void iomap_dio_submit_bio(const struct iomap_iter *iter,
 		struct iomap_dio *dio, struct bio *bio, loff_t pos)
 {
+	struct kiocb *iocb = dio->iocb;
+
 	atomic_inc(&dio->ref);
 
 	/* Sync dio can't be polled reliably */
-	if ((dio->iocb->ki_flags & IOCB_HIPRI) && !is_sync_kiocb(dio->iocb)) {
-		bio_set_polled(bio, dio->iocb);
-		dio->submit.poll_bio = bio;
+	if ((iocb->ki_flags & IOCB_HIPRI) && !is_sync_kiocb(iocb)) {
+		bio_set_polled(bio, iocb);
+		WRITE_ONCE(iocb->private, bio);
 	}
 
 	if (dio->dops && dio->dops->submit_io)
@@ -130,6 +133,11 @@ ssize_t iomap_dio_complete(struct iomap_dio *dio)
 }
 EXPORT_SYMBOL_GPL(iomap_dio_complete);
 
+static ssize_t iomap_dio_deferred_complete(void *data)
+{
+	return iomap_dio_complete(data);
+}
+
 static void iomap_dio_complete_work(struct work_struct *work)
 {
 	struct iomap_dio *dio = container_of(work, struct iomap_dio, aio.work);
@@ -152,27 +160,69 @@ void iomap_dio_bio_end_io(struct bio *bio)
 {
 	struct iomap_dio *dio = bio->bi_private;
 	bool should_dirty = (dio->flags & IOMAP_DIO_DIRTY);
+	struct kiocb *iocb = dio->iocb;
 
 	if (bio->bi_status)
 		iomap_dio_set_error(dio, blk_status_to_errno(bio->bi_status));
+	if (!atomic_dec_and_test(&dio->ref))
+		goto release_bio;
 
-	if (atomic_dec_and_test(&dio->ref)) {
-		if (dio->wait_for_completion) {
-			struct task_struct *waiter = dio->submit.waiter;
-			WRITE_ONCE(dio->submit.waiter, NULL);
-			blk_wake_io_task(waiter);
-		} else if (dio->flags & IOMAP_DIO_WRITE) {
-			struct inode *inode = file_inode(dio->iocb->ki_filp);
-
-			WRITE_ONCE(dio->iocb->private, NULL);
-			INIT_WORK(&dio->aio.work, iomap_dio_complete_work);
-			queue_work(inode->i_sb->s_dio_done_wq, &dio->aio.work);
-		} else {
-			WRITE_ONCE(dio->iocb->private, NULL);
-			iomap_dio_complete_work(&dio->aio.work);
-		}
+	/*
+	 * Synchronous dio, task itself will handle any completion work
+	 * that needs after IO. All we need to do is wake the task.
+	 */
+	if (dio->wait_for_completion) {
+		struct task_struct *waiter = dio->submit.waiter;
+
+		WRITE_ONCE(dio->submit.waiter, NULL);
+		blk_wake_io_task(waiter);
+		goto release_bio;
+	}
+
+	/*
+	 * Flagged with IOMAP_DIO_INLINE_COMP, we can complete it inline
+	 */
+	if (dio->flags & IOMAP_DIO_INLINE_COMP) {
+		WRITE_ONCE(iocb->private, NULL);
+		iomap_dio_complete_work(&dio->aio.work);
+		goto release_bio;
+	}
+
+	/*
+	 * If this dio is flagged with IOMAP_DIO_CALLER_COMP, then schedule
+	 * our completion that way to avoid an async punt to a workqueue.
+	 */
+	if (dio->flags & IOMAP_DIO_CALLER_COMP) {
+		/* only polled IO cares about private cleared */
+		iocb->private = dio;
+		iocb->dio_complete = iomap_dio_deferred_complete;
+
+		/*
+		 * Invoke ->ki_complete() directly. We've assigned our
+		 * dio_complete callback handler, and since the issuer set
+		 * IOCB_DIO_CALLER_COMP, we know their ki_complete handler will
+		 * notice ->dio_complete being set and will defer calling that
+		 * handler until it can be done from a safe task context.
+		 *
+		 * Note that the 'res' being passed in here is not important
+		 * for this case. The actual completion value of the request
+		 * will be gotten from dio_complete when that is run by the
+		 * issuer.
+		 */
+		iocb->ki_complete(iocb, 0);
+		goto release_bio;
 	}
 
+	/*
+	 * Async DIO completion that requires filesystem level completion work
+	 * gets punted to a work queue to complete as the operation may require
+	 * more IO to be issued to finalise filesystem metadata changes or
+	 * guarantee data integrity.
+	 */
+	INIT_WORK(&dio->aio.work, iomap_dio_complete_work);
+	queue_work(file_inode(iocb->ki_filp)->i_sb->s_dio_done_wq,
+			&dio->aio.work);
+release_bio:
 	if (should_dirty) {
 		bio_check_pages_dirty(bio);
 	} else {
@@ -203,7 +253,7 @@ static void iomap_dio_zero(const struct iomap_iter *iter, struct iomap_dio *dio,
 /*
  * Figure out the bio's operation flags from the dio request, the
  * mapping, and whether or not we want FUA.  Note that we can end up
- * clearing the WRITE_FUA flag in the dio request.
+ * clearing the WRITE_THROUGH flag in the dio request.
  */
 static inline blk_opf_t iomap_dio_bio_opflags(struct iomap_dio *dio,
 		const struct iomap *iomap, bool use_fua)
@@ -217,7 +267,7 @@ static inline blk_opf_t iomap_dio_bio_opflags(struct iomap_dio *dio,
 	if (use_fua)
 		opflags |= REQ_FUA;
 	else
-		dio->flags &= ~IOMAP_DIO_WRITE_FUA;
+		dio->flags &= ~IOMAP_DIO_WRITE_THROUGH;
 
 	return opflags;
 }
@@ -257,12 +307,19 @@ static loff_t iomap_dio_bio_iter(const struct iomap_iter *iter,
 		 * Use a FUA write if we need datasync semantics, this is a pure
 		 * data IO that doesn't require any metadata updates (including
 		 * after IO completion such as unwritten extent conversion) and
-		 * the underlying device supports FUA. This allows us to avoid
-		 * cache flushes on IO completion.
+		 * the underlying device either supports FUA or doesn't have
+		 * a volatile write cache. This allows us to avoid cache flushes
+		 * on IO completion. If we can't use writethrough and need to
+		 * sync, disable in-task completions as dio completion will
+		 * need to call generic_write_sync() which will do a blocking
+		 * fsync / cache flush call.
 		 */
 		if (!(iomap->flags & (IOMAP_F_SHARED|IOMAP_F_DIRTY)) &&
-		    (dio->flags & IOMAP_DIO_WRITE_FUA) && bdev_fua(iomap->bdev))
+		    (dio->flags & IOMAP_DIO_WRITE_THROUGH) &&
+		    (bdev_fua(iomap->bdev) || !bdev_write_cache(iomap->bdev)))
 			use_fua = true;
+		else if (dio->flags & IOMAP_DIO_NEED_SYNC)
+			dio->flags &= ~IOMAP_DIO_CALLER_COMP;
 	}
 
 	/*
@@ -277,10 +334,23 @@ static loff_t iomap_dio_bio_iter(const struct iomap_iter *iter,
 		goto out;
 
 	/*
-	 * We can only poll for single bio I/Os.
+	 * We can only do deferred completion for pure overwrites that
+	 * don't require additional IO at completion. This rules out
+	 * writes that need zeroing or extent conversion, extend
+	 * the file size, or issue journal IO or cache flushes
+	 * during completion processing.
 	 */
 	if (need_zeroout ||
+	    ((dio->flags & IOMAP_DIO_NEED_SYNC) && !use_fua) ||
 	    ((dio->flags & IOMAP_DIO_WRITE) && pos >= i_size_read(inode)))
+		dio->flags &= ~IOMAP_DIO_CALLER_COMP;
+
+	/*
+	 * The rules for polled IO completions follow the guidelines as the
+	 * ones we set for inline and deferred completions. If none of those
+	 * are available for this IO, clear the polled flag.
+	 */
+	if (!(dio->flags & (IOMAP_DIO_INLINE_COMP|IOMAP_DIO_CALLER_COMP)))
 		dio->iocb->ki_flags &= ~IOCB_HIPRI;
 
 	if (need_zeroout) {
@@ -505,12 +575,14 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 
 	dio->submit.iter = iter;
 	dio->submit.waiter = current;
-	dio->submit.poll_bio = NULL;
 
 	if (iocb->ki_flags & IOCB_NOWAIT)
 		iomi.flags |= IOMAP_NOWAIT;
 
 	if (iov_iter_rw(iter) == READ) {
+		/* reads can always complete inline */
+		dio->flags |= IOMAP_DIO_INLINE_COMP;
+
 		if (iomi.pos >= dio->i_size)
 			goto out_free_dio;
 
@@ -524,6 +596,15 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 		iomi.flags |= IOMAP_WRITE;
 		dio->flags |= IOMAP_DIO_WRITE;
 
+		/*
+		 * Flag as supporting deferred completions, if the issuer
+		 * groks it. This can avoid a workqueue punt for writes.
+		 * We may later clear this flag if we need to do other IO
+		 * as part of this IO completion.
+		 */
+		if (iocb->ki_flags & IOCB_DIO_CALLER_COMP)
+			dio->flags |= IOMAP_DIO_CALLER_COMP;
+
 		if (dio_flags & IOMAP_DIO_OVERWRITE_ONLY) {
 			ret = -EAGAIN;
 			if (iomi.pos >= dio->i_size ||
@@ -537,13 +618,16 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 			dio->flags |= IOMAP_DIO_NEED_SYNC;
 
 		       /*
-			* For datasync only writes, we optimistically try
-			* using FUA for this IO.  Any non-FUA write that
-			* occurs will clear this flag, hence we know before
-			* completion whether a cache flush is necessary.
+			* For datasync only writes, we optimistically try using
+			* WRITE_THROUGH for this IO. This flag requires either
+			* FUA writes through the device's write cache, or a
+			* normal write to a device without a volatile write
+			* cache. For the former, Any non-FUA write that occurs
+			* will clear this flag, hence we know before completion
+			* whether a cache flush is necessary.
 			*/
 			if (!(iocb->ki_flags & IOCB_SYNC))
-				dio->flags |= IOMAP_DIO_WRITE_FUA;
+				dio->flags |= IOMAP_DIO_WRITE_THROUGH;
 		}
 
 		/*
@@ -605,14 +689,13 @@ __iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
 		iomap_dio_set_error(dio, ret);
 
 	/*
-	 * If all the writes we issued were FUA, we don't need to flush the
-	 * cache on IO completion. Clear the sync flag for this case.
+	 * If all the writes we issued were already written through to the
+	 * media, we don't need to flush the cache on IO completion. Clear the
+	 * sync flag for this case.
 	 */
-	if (dio->flags & IOMAP_DIO_WRITE_FUA)
+	if (dio->flags & IOMAP_DIO_WRITE_THROUGH)
 		dio->flags &= ~IOMAP_DIO_NEED_SYNC;
 
-	WRITE_ONCE(iocb->private, dio->submit.poll_bio);
-
 	/*
 	 * We are about to drop our additional submission reference, which
 	 * might be the last reference to the dio.  There are three different
diff --git a/fs/xfs/xfs_aops.c b/fs/xfs/xfs_aops.c
index 451942fb38ec2142b6b5242ce874974127ec2b92..2fca4b4e7fd8f656116fba83f7e6701b915481c6 100644
--- a/fs/xfs/xfs_aops.c
+++ b/fs/xfs/xfs_aops.c
@@ -578,7 +578,7 @@ const struct address_space_operations xfs_address_space_operations = {
 	.read_folio		= xfs_vm_read_folio,
 	.readahead		= xfs_vm_readahead,
 	.writepages		= xfs_vm_writepages,
-	.dirty_folio		= filemap_dirty_folio,
+	.dirty_folio		= iomap_dirty_folio,
 	.release_folio		= iomap_release_folio,
 	.invalidate_folio	= iomap_invalidate_folio,
 	.bmap			= xfs_vm_bmap,
diff --git a/fs/zonefs/file.c b/fs/zonefs/file.c
index 789cfb74c1463ea1112cb094cb88241b230e84e9..b2c9b35df8f76d155ce51b5fecb8c04e98c834e0 100644
--- a/fs/zonefs/file.c
+++ b/fs/zonefs/file.c
@@ -175,7 +175,7 @@ const struct address_space_operations zonefs_file_aops = {
 	.read_folio		= zonefs_read_folio,
 	.readahead		= zonefs_readahead,
 	.writepages		= zonefs_writepages,
-	.dirty_folio		= filemap_dirty_folio,
+	.dirty_folio		= iomap_dirty_folio,
 	.release_folio		= iomap_release_folio,
 	.invalidate_folio	= iomap_invalidate_folio,
 	.migrate_folio		= filemap_migrate_folio,
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 4e270f3ed58e53a5e6138279872a9ccfdb263f27..dda08d97363936ec7ce72906fb8289b6d41f3994 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -338,6 +338,20 @@ enum rw_hint {
 #define IOCB_NOIO		(1 << 20)
 /* can use bio alloc cache */
 #define IOCB_ALLOC_CACHE	(1 << 21)
+/*
+ * IOCB_DIO_CALLER_COMP can be set by the iocb owner, to indicate that the
+ * iocb completion can be passed back to the owner for execution from a safe
+ * context rather than needing to be punted through a workqueue. If this
+ * flag is set, the bio completion handling may set iocb->dio_complete to a
+ * handler function and iocb->private to context information for that handler.
+ * The issuer should call the handler with that context information from task
+ * context to complete the processing of the iocb. Note that while this
+ * provides a task context for the dio_complete() callback, it should only be
+ * used on the completion side for non-IO generating completions. It's fine to
+ * call blocking functions from this callback, but they should not wait for
+ * unrelated IO (like cache flushing, new IO generation, etc).
+ */
+#define IOCB_DIO_CALLER_COMP	(1 << 22)
 
 /* for use in trace events */
 #define TRACE_IOCB_STRINGS \
@@ -351,7 +365,8 @@ enum rw_hint {
 	{ IOCB_WRITE,		"WRITE" }, \
 	{ IOCB_WAITQ,		"WAITQ" }, \
 	{ IOCB_NOIO,		"NOIO" }, \
-	{ IOCB_ALLOC_CACHE,	"ALLOC_CACHE" }
+	{ IOCB_ALLOC_CACHE,	"ALLOC_CACHE" }, \
+	{ IOCB_DIO_CALLER_COMP,	"CALLER_COMP" }
 
 struct kiocb {
 	struct file		*ki_filp;
@@ -360,7 +375,23 @@ struct kiocb {
 	void			*private;
 	int			ki_flags;
 	u16			ki_ioprio; /* See linux/ioprio.h */
-	struct wait_page_queue	*ki_waitq; /* for async buffered IO */
+	union {
+		/*
+		 * Only used for async buffered reads, where it denotes the
+		 * page waitqueue associated with completing the read. Valid
+		 * IFF IOCB_WAITQ is set.
+		 */
+		struct wait_page_queue	*ki_waitq;
+		/*
+		 * Can be used for O_DIRECT IO, where the completion handling
+		 * is punted back to the issuer of the IO. May only be set
+		 * if IOCB_DIO_CALLER_COMP is set by the issuer, and the issuer
+		 * must then check for presence of this handler when ki_complete
+		 * is invoked. The data passed in to this handler must be
+		 * assigned to ->private when dio_complete is assigned.
+		 */
+		ssize_t (*dio_complete)(void *data);
+	};
 };
 
 static inline bool is_sync_kiocb(struct kiocb *kiocb)
diff --git a/include/linux/iomap.h b/include/linux/iomap.h
index e2b836c2e119ae7d574874ddd35732a4e30e7d3a..fdc6e64f49d63afabb6c0c2ebbd6aea132de6036 100644
--- a/include/linux/iomap.h
+++ b/include/linux/iomap.h
@@ -261,9 +261,10 @@ int iomap_file_buffered_write_punch_delalloc(struct inode *inode,
 int iomap_read_folio(struct folio *folio, const struct iomap_ops *ops);
 void iomap_readahead(struct readahead_control *, const struct iomap_ops *ops);
 bool iomap_is_partially_uptodate(struct folio *, size_t from, size_t count);
-struct folio *iomap_get_folio(struct iomap_iter *iter, loff_t pos);
+struct folio *iomap_get_folio(struct iomap_iter *iter, loff_t pos, size_t len);
 bool iomap_release_folio(struct folio *folio, gfp_t gfp_flags);
 void iomap_invalidate_folio(struct folio *folio, size_t offset, size_t len);
+bool iomap_dirty_folio(struct address_space *mapping, struct folio *folio);
 int iomap_file_unshare(struct inode *inode, loff_t pos, loff_t len,
 		const struct iomap_ops *ops);
 int iomap_zero_range(struct inode *inode, loff_t pos, loff_t len,
diff --git a/include/linux/pagemap.h b/include/linux/pagemap.h
index 716953ee1ebdb26e2be6c08f28f4a77b4e40ee54..d87840acbfb2314b878eced14049132b72751429 100644
--- a/include/linux/pagemap.h
+++ b/include/linux/pagemap.h
@@ -470,6 +470,19 @@ static inline void *detach_page_private(struct page *page)
 	return folio_detach_private(page_folio(page));
 }
 
+/*
+ * There are some parts of the kernel which assume that PMD entries
+ * are exactly HPAGE_PMD_ORDER.  Those should be fixed, but until then,
+ * limit the maximum allocation order to PMD size.  I'm not aware of any
+ * assumptions about maximum order if THP are disabled, but 8 seems like
+ * a good order (that's 1MB if you're using 4kB pages)
+ */
+#ifdef CONFIG_TRANSPARENT_HUGEPAGE
+#define MAX_PAGECACHE_ORDER	HPAGE_PMD_ORDER
+#else
+#define MAX_PAGECACHE_ORDER	8
+#endif
+
 #ifdef CONFIG_NUMA
 struct folio *filemap_alloc_folio(gfp_t gfp, unsigned int order);
 #else
@@ -501,22 +514,69 @@ pgoff_t page_cache_next_miss(struct address_space *mapping,
 pgoff_t page_cache_prev_miss(struct address_space *mapping,
 			     pgoff_t index, unsigned long max_scan);
 
-#define FGP_ACCESSED		0x00000001
-#define FGP_LOCK		0x00000002
-#define FGP_CREAT		0x00000004
-#define FGP_WRITE		0x00000008
-#define FGP_NOFS		0x00000010
-#define FGP_NOWAIT		0x00000020
-#define FGP_FOR_MMAP		0x00000040
-#define FGP_STABLE		0x00000080
+/**
+ * typedef fgf_t - Flags for getting folios from the page cache.
+ *
+ * Most users of the page cache will not need to use these flags;
+ * there are convenience functions such as filemap_get_folio() and
+ * filemap_lock_folio().  For users which need more control over exactly
+ * what is done with the folios, these flags to __filemap_get_folio()
+ * are available.
+ *
+ * * %FGP_ACCESSED - The folio will be marked accessed.
+ * * %FGP_LOCK - The folio is returned locked.
+ * * %FGP_CREAT - If no folio is present then a new folio is allocated,
+ *   added to the page cache and the VM's LRU list.  The folio is
+ *   returned locked.
+ * * %FGP_FOR_MMAP - The caller wants to do its own locking dance if the
+ *   folio is already in cache.  If the folio was allocated, unlock it
+ *   before returning so the caller can do the same dance.
+ * * %FGP_WRITE - The folio will be written to by the caller.
+ * * %FGP_NOFS - __GFP_FS will get cleared in gfp.
+ * * %FGP_NOWAIT - Don't block on the folio lock.
+ * * %FGP_STABLE - Wait for the folio to be stable (finished writeback)
+ * * %FGP_WRITEBEGIN - The flags to use in a filesystem write_begin()
+ *   implementation.
+ */
+typedef unsigned int __bitwise fgf_t;
+
+#define FGP_ACCESSED		((__force fgf_t)0x00000001)
+#define FGP_LOCK		((__force fgf_t)0x00000002)
+#define FGP_CREAT		((__force fgf_t)0x00000004)
+#define FGP_WRITE		((__force fgf_t)0x00000008)
+#define FGP_NOFS		((__force fgf_t)0x00000010)
+#define FGP_NOWAIT		((__force fgf_t)0x00000020)
+#define FGP_FOR_MMAP		((__force fgf_t)0x00000040)
+#define FGP_STABLE		((__force fgf_t)0x00000080)
+#define FGF_GET_ORDER(fgf)	(((__force unsigned)fgf) >> 26)	/* top 6 bits */
 
 #define FGP_WRITEBEGIN		(FGP_LOCK | FGP_WRITE | FGP_CREAT | FGP_STABLE)
 
+/**
+ * fgf_set_order - Encode a length in the fgf_t flags.
+ * @size: The suggested size of the folio to create.
+ *
+ * The caller of __filemap_get_folio() can use this to suggest a preferred
+ * size for the folio that is created.  If there is already a folio at
+ * the index, it will be returned, no matter what its size.  If a folio
+ * is freshly created, it may be of a different size than requested
+ * due to alignment constraints, memory pressure, or the presence of
+ * other folios at nearby indices.
+ */
+static inline fgf_t fgf_set_order(size_t size)
+{
+	unsigned int shift = ilog2(size);
+
+	if (shift <= PAGE_SHIFT)
+		return 0;
+	return (__force fgf_t)((shift - PAGE_SHIFT) << 26);
+}
+
 void *filemap_get_entry(struct address_space *mapping, pgoff_t index);
 struct folio *__filemap_get_folio(struct address_space *mapping, pgoff_t index,
-		int fgp_flags, gfp_t gfp);
+		fgf_t fgp_flags, gfp_t gfp);
 struct page *pagecache_get_page(struct address_space *mapping, pgoff_t index,
-		int fgp_flags, gfp_t gfp);
+		fgf_t fgp_flags, gfp_t gfp);
 
 /**
  * filemap_get_folio - Find and get a folio.
@@ -590,7 +650,7 @@ static inline struct page *find_get_page(struct address_space *mapping,
 }
 
 static inline struct page *find_get_page_flags(struct address_space *mapping,
-					pgoff_t offset, int fgp_flags)
+					pgoff_t offset, fgf_t fgp_flags)
 {
 	return pagecache_get_page(mapping, offset, fgp_flags, 0);
 }
diff --git a/include/linux/uio.h b/include/linux/uio.h
index ff81e5ccaef2fac41b82ce34e065317a189cc6b8..42bce38a8e870506a09e248a32b8dea1434cba84 100644
--- a/include/linux/uio.h
+++ b/include/linux/uio.h
@@ -163,7 +163,7 @@ static inline size_t iov_length(const struct iovec *iov, unsigned long nr_segs)
 	return ret;
 }
 
-size_t copy_page_from_iter_atomic(struct page *page, unsigned offset,
+size_t copy_page_from_iter_atomic(struct page *page, size_t offset,
 				  size_t bytes, struct iov_iter *i);
 void iov_iter_advance(struct iov_iter *i, size_t bytes);
 void iov_iter_revert(struct iov_iter *i, size_t bytes);
@@ -184,6 +184,13 @@ static inline size_t copy_folio_to_iter(struct folio *folio, size_t offset,
 {
 	return copy_page_to_iter(&folio->page, offset, bytes, i);
 }
+
+static inline size_t copy_folio_from_iter_atomic(struct folio *folio,
+		size_t offset, size_t bytes, struct iov_iter *i)
+{
+	return copy_page_from_iter_atomic(&folio->page, offset, bytes, i);
+}
+
 size_t copy_page_to_iter_nofault(struct page *page, unsigned offset,
 				 size_t bytes, struct iov_iter *i);
 
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 9581b90cb459d72a400c0a45ae543b7218cc0af5..b3435033fadfb79199f33632d906c82cf04884e9 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -105,6 +105,7 @@ int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	} else {
 		rw->kiocb.ki_ioprio = get_current_ioprio();
 	}
+	rw->kiocb.dio_complete = NULL;
 
 	rw->addr = READ_ONCE(sqe->addr);
 	rw->len = READ_ONCE(sqe->len);
@@ -280,6 +281,15 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
 
 void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
 {
+	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
+	struct kiocb *kiocb = &rw->kiocb;
+
+	if ((kiocb->ki_flags & IOCB_DIO_CALLER_COMP) && kiocb->dio_complete) {
+		long res = kiocb->dio_complete(rw->kiocb.private);
+
+		io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+	}
+
 	io_req_io_end(req);
 
 	if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
@@ -295,9 +305,11 @@ static void io_complete_rw(struct kiocb *kiocb, long res)
 	struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
 	struct io_kiocb *req = cmd_to_io_kiocb(rw);
 
-	if (__io_complete_rw_common(req, res))
-		return;
-	io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+	if (!kiocb->dio_complete || !(kiocb->ki_flags & IOCB_DIO_CALLER_COMP)) {
+		if (__io_complete_rw_common(req, res))
+			return;
+		io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+	}
 	req->io_task_work.func = io_req_rw_complete;
 	__io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
 }
@@ -901,6 +913,15 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
 		kiocb_start_write(kiocb);
 	kiocb->ki_flags |= IOCB_WRITE;
 
+	/*
+	 * For non-polled IO, set IOCB_DIO_CALLER_COMP, stating that our handler
+	 * groks deferring the completion to task context. This isn't
+	 * necessary and useful for polled IO as that can always complete
+	 * directly.
+	 */
+	if (!(kiocb->ki_flags & IOCB_HIPRI))
+		kiocb->ki_flags |= IOCB_DIO_CALLER_COMP;
+
 	if (likely(req->file->f_op->write_iter))
 		ret2 = call_write_iter(req->file, kiocb, &s->iter);
 	else if (req->file->f_op->write)
diff --git a/lib/iov_iter.c b/lib/iov_iter.c
index e4dc809d10754fffca03b5f141d981d8805f9803..424737045b9798291a0a66a74a99859516236577 100644
--- a/lib/iov_iter.c
+++ b/lib/iov_iter.c
@@ -566,24 +566,37 @@ size_t iov_iter_zero(size_t bytes, struct iov_iter *i)
 }
 EXPORT_SYMBOL(iov_iter_zero);
 
-size_t copy_page_from_iter_atomic(struct page *page, unsigned offset, size_t bytes,
-				  struct iov_iter *i)
+size_t copy_page_from_iter_atomic(struct page *page, size_t offset,
+		size_t bytes, struct iov_iter *i)
 {
-	char *kaddr = kmap_atomic(page), *p = kaddr + offset;
-	if (!page_copy_sane(page, offset, bytes)) {
-		kunmap_atomic(kaddr);
+	size_t n, copied = 0;
+
+	if (!page_copy_sane(page, offset, bytes))
 		return 0;
-	}
-	if (WARN_ON_ONCE(!i->data_source)) {
-		kunmap_atomic(kaddr);
+	if (WARN_ON_ONCE(!i->data_source))
 		return 0;
-	}
-	iterate_and_advance(i, bytes, base, len, off,
-		copyin(p + off, base, len),
-		memcpy_from_iter(i, p + off, base, len)
-	)
-	kunmap_atomic(kaddr);
-	return bytes;
+
+	do {
+		char *p;
+
+		n = bytes - copied;
+		if (PageHighMem(page)) {
+			page += offset / PAGE_SIZE;
+			offset %= PAGE_SIZE;
+			n = min_t(size_t, n, PAGE_SIZE - offset);
+		}
+
+		p = kmap_atomic(page) + offset;
+		iterate_and_advance(i, n, base, len, off,
+			copyin(p + off, base, len),
+			memcpy_from_iter(i, p + off, base, len)
+		)
+		kunmap_atomic(p);
+		copied += n;
+		offset += n;
+	} while (PageHighMem(page) && copied != bytes && n > 0);
+
+	return copied;
 }
 EXPORT_SYMBOL(copy_page_from_iter_atomic);
 
diff --git a/mm/filemap.c b/mm/filemap.c
index 9e44a49bbd74d7ba2cd55849ff0074e4c5318c60..baafbf324c9f2bba3752c1b40bd846f404e2b2c3 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1855,30 +1855,15 @@ void *filemap_get_entry(struct address_space *mapping, pgoff_t index)
  *
  * Looks up the page cache entry at @mapping & @index.
  *
- * @fgp_flags can be zero or more of these flags:
- *
- * * %FGP_ACCESSED - The folio will be marked accessed.
- * * %FGP_LOCK - The folio is returned locked.
- * * %FGP_CREAT - If no page is present then a new page is allocated using
- *   @gfp and added to the page cache and the VM's LRU list.
- *   The page is returned locked and with an increased refcount.
- * * %FGP_FOR_MMAP - The caller wants to do its own locking dance if the
- *   page is already in cache.  If the page was allocated, unlock it before
- *   returning so the caller can do the same dance.
- * * %FGP_WRITE - The page will be written to by the caller.
- * * %FGP_NOFS - __GFP_FS will get cleared in gfp.
- * * %FGP_NOWAIT - Don't get blocked by page lock.
- * * %FGP_STABLE - Wait for the folio to be stable (finished writeback)
- *
  * If %FGP_LOCK or %FGP_CREAT are specified then the function may sleep even
  * if the %GFP flags specified for %FGP_CREAT are atomic.
  *
- * If there is a page cache page, it is returned with an increased refcount.
+ * If this function returns a folio, it is returned with an increased refcount.
  *
  * Return: The found folio or an ERR_PTR() otherwise.
  */
 struct folio *__filemap_get_folio(struct address_space *mapping, pgoff_t index,
-		int fgp_flags, gfp_t gfp)
+		fgf_t fgp_flags, gfp_t gfp)
 {
 	struct folio *folio;
 
@@ -1920,7 +1905,9 @@ struct folio *__filemap_get_folio(struct address_space *mapping, pgoff_t index,
 		folio_wait_stable(folio);
 no_page:
 	if (!folio && (fgp_flags & FGP_CREAT)) {
+		unsigned order = FGF_GET_ORDER(fgp_flags);
 		int err;
+
 		if ((fgp_flags & FGP_WRITE) && mapping_can_writeback(mapping))
 			gfp |= __GFP_WRITE;
 		if (fgp_flags & FGP_NOFS)
@@ -1929,26 +1916,44 @@ struct folio *__filemap_get_folio(struct address_space *mapping, pgoff_t index,
 			gfp &= ~GFP_KERNEL;
 			gfp |= GFP_NOWAIT | __GFP_NOWARN;
 		}
-
-		folio = filemap_alloc_folio(gfp, 0);
-		if (!folio)
-			return ERR_PTR(-ENOMEM);
-
 		if (WARN_ON_ONCE(!(fgp_flags & (FGP_LOCK | FGP_FOR_MMAP))))
 			fgp_flags |= FGP_LOCK;
 
-		/* Init accessed so avoid atomic mark_page_accessed later */
-		if (fgp_flags & FGP_ACCESSED)
-			__folio_set_referenced(folio);
+		if (!mapping_large_folio_support(mapping))
+			order = 0;
+		if (order > MAX_PAGECACHE_ORDER)
+			order = MAX_PAGECACHE_ORDER;
+		/* If we're not aligned, allocate a smaller folio */
+		if (index & ((1UL << order) - 1))
+			order = __ffs(index);
 
-		err = filemap_add_folio(mapping, folio, index, gfp);
-		if (unlikely(err)) {
+		do {
+			gfp_t alloc_gfp = gfp;
+
+			err = -ENOMEM;
+			if (order == 1)
+				order = 0;
+			if (order > 0)
+				alloc_gfp |= __GFP_NORETRY | __GFP_NOWARN;
+			folio = filemap_alloc_folio(alloc_gfp, order);
+			if (!folio)
+				continue;
+
+			/* Init accessed so avoid atomic mark_page_accessed later */
+			if (fgp_flags & FGP_ACCESSED)
+				__folio_set_referenced(folio);
+
+			err = filemap_add_folio(mapping, folio, index, gfp);
+			if (!err)
+				break;
 			folio_put(folio);
 			folio = NULL;
-			if (err == -EEXIST)
-				goto repeat;
-		}
+		} while (order-- > 0);
 
+		if (err == -EEXIST)
+			goto repeat;
+		if (err)
+			return ERR_PTR(err);
 		/*
 		 * filemap_add_folio locks the page, and for mmap
 		 * we expect an unlocked page.
diff --git a/mm/folio-compat.c b/mm/folio-compat.c
index c6f056c2050340fb77f62c7b486ab0d86dc529ce..10c3247542cbefc18871f109c1c70a2ff2bfb12f 100644
--- a/mm/folio-compat.c
+++ b/mm/folio-compat.c
@@ -92,7 +92,7 @@ EXPORT_SYMBOL(add_to_page_cache_lru);
 
 noinline
 struct page *pagecache_get_page(struct address_space *mapping, pgoff_t index,
-		int fgp_flags, gfp_t gfp)
+		fgf_t fgp_flags, gfp_t gfp)
 {
 	struct folio *folio;
 
diff --git a/mm/readahead.c b/mm/readahead.c
index a9c999aa19af65e8ab9be6163481a74bb6af85b7..e815c114de21e3f2043cade525195713b7e703fd 100644
--- a/mm/readahead.c
+++ b/mm/readahead.c
@@ -461,19 +461,6 @@ static int try_context_readahead(struct address_space *mapping,
 	return 1;
 }
 
-/*
- * There are some parts of the kernel which assume that PMD entries
- * are exactly HPAGE_PMD_ORDER.  Those should be fixed, but until then,
- * limit the maximum allocation order to PMD size.  I'm not aware of any
- * assumptions about maximum order if THP are disabled, but 8 seems like
- * a good order (that's 1MB if you're using 4kB pages)
- */
-#ifdef CONFIG_TRANSPARENT_HUGEPAGE
-#define MAX_PAGECACHE_ORDER	HPAGE_PMD_ORDER
-#else
-#define MAX_PAGECACHE_ORDER	8
-#endif
-
 static inline int ra_alloc_folio(struct readahead_control *ractl, pgoff_t index,
 		pgoff_t mark, unsigned int order, gfp_t gfp)
 {